1# dialects/oracle/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r"""
11.. dialect:: oracle
12 :name: Oracle Database
13 :normal_support: 11+
14 :best_effort: 9+
15
16
17Auto Increment Behavior
18-----------------------
19
20SQLAlchemy Table objects which include integer primary keys are usually assumed
21to have "autoincrementing" behavior, meaning they can generate their own
22primary key values upon INSERT. For use within Oracle Database, two options are
23available, which are the use of IDENTITY columns (Oracle Database 12 and above
24only) or the association of a SEQUENCE with the column.
25
26Specifying GENERATED AS IDENTITY (Oracle Database 12 and above)
27~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
28
29Starting from version 12, Oracle Database can make use of identity columns
30using the :class:`_sql.Identity` to specify the autoincrementing behavior::
31
32 t = Table(
33 "mytable",
34 metadata,
35 Column("id", Integer, Identity(start=3), primary_key=True),
36 Column(...),
37 ...,
38 )
39
40The CREATE TABLE for the above :class:`_schema.Table` object would be:
41
42.. sourcecode:: sql
43
44 CREATE TABLE mytable (
45 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
46 ...,
47 PRIMARY KEY (id)
48 )
49
50The :class:`_schema.Identity` object support many options to control the
51"autoincrementing" behavior of the column, like the starting value, the
52incrementing value, etc. In addition to the standard options, Oracle Database
53supports setting :paramref:`_schema.Identity.always` to ``None`` to use the
54default generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
55setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
56in conjunction with a 'BY DEFAULT' identity column.
57
58Using a SEQUENCE (all Oracle Database versions)
59~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
60
61Older version of Oracle Database had no "autoincrement" feature: SQLAlchemy
62relies upon sequences to produce these values. With the older Oracle Database
63versions, *a sequence must always be explicitly specified to enable
64autoincrement*. This is divergent with the majority of documentation examples
65which assume the usage of an autoincrement-capable database. To specify
66sequences, use the sqlalchemy.schema.Sequence object which is passed to a
67Column construct::
68
69 t = Table(
70 "mytable",
71 metadata,
72 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
73 Column(...),
74 ...,
75 )
76
77This step is also required when using table reflection, i.e. autoload_with=engine::
78
79 t = Table(
80 "mytable",
81 metadata,
82 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
83 autoload_with=engine,
84 )
85
86.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
87 in a :class:`_schema.Column` to specify the option of an autoincrementing
88 column.
89
90.. _oracle_isolation_level:
91
92Transaction Isolation Level / Autocommit
93----------------------------------------
94
95Oracle Database supports "READ COMMITTED" and "SERIALIZABLE" modes of
96isolation. The AUTOCOMMIT isolation level is also supported by the
97python-oracledb and cx_Oracle dialects.
98
99To set using per-connection execution options::
100
101 connection = engine.connect()
102 connection = connection.execution_options(isolation_level="AUTOCOMMIT")
103
104For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle Database dialects sets
105the level at the session level using ``ALTER SESSION``, which is reverted back
106to its default setting when the connection is returned to the connection pool.
107
108Valid values for ``isolation_level`` include:
109
110* ``READ COMMITTED``
111* ``AUTOCOMMIT``
112* ``SERIALIZABLE``
113
114.. note:: The implementation for the
115 :meth:`_engine.Connection.get_isolation_level` method as implemented by the
116 Oracle Database dialects necessarily force the start of a transaction using the
117 Oracle Database DBMS_TRANSACTION.LOCAL_TRANSACTION_ID function; otherwise no
118 level is normally readable.
119
120 Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
121 raise an exception if the ``v$transaction`` view is not available due to
122 permissions or other reasons, which is a common occurrence in Oracle Database
123 installations.
124
125 The python-oracledb and cx_Oracle dialects attempt to call the
126 :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
127 its first connection to the database in order to acquire the
128 "default"isolation level. This default level is necessary so that the level
129 can be reset on a connection after it has been temporarily modified using
130 :meth:`_engine.Connection.execution_options` method. In the common event
131 that the :meth:`_engine.Connection.get_isolation_level` method raises an
132 exception due to ``v$transaction`` not being readable as well as any other
133 database-related failure, the level is assumed to be "READ COMMITTED". No
134 warning is emitted for this initial first-connect condition as it is
135 expected to be a common restriction on Oracle databases.
136
137.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_Oracle dialect
138 as well as the notion of a default isolation level
139
140.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
141 reading of the isolation level.
142
143.. versionchanged:: 1.3.22 In the event that the default isolation
144 level cannot be read due to permissions on the v$transaction view as
145 is common in Oracle installations, the default isolation level is hardcoded
146 to "READ COMMITTED" which was the behavior prior to 1.3.21.
147
148.. seealso::
149
150 :ref:`dbapi_autocommit`
151
152Identifier Casing
153-----------------
154
155In Oracle Database, the data dictionary represents all case insensitive
156identifier names using UPPERCASE text. This is in contradiction to the
157expectations of SQLAlchemy, which assume a case insensitive name is represented
158as lowercase text.
159
160As an example of case insensitive identifier names, consider the following table:
161
162.. sourcecode:: sql
163
164 CREATE TABLE MyTable (Identifier INTEGER PRIMARY KEY)
165
166If you were to ask Oracle Database for information about this table, the
167table name would be reported as ``MYTABLE`` and the column name would
168be reported as ``IDENTIFIER``. Compare to most other databases such as
169PostgreSQL and MySQL which would report these names as ``mytable`` and
170``identifier``. The names are **not quoted, therefore are case insensitive**.
171The special casing of ``MyTable`` and ``Identifier`` would only be maintained
172if they were quoted in the table definition:
173
174.. sourcecode:: sql
175
176 CREATE TABLE "MyTable" ("Identifier" INTEGER PRIMARY KEY)
177
178When constructing a SQLAlchemy :class:`.Table` object, **an all lowercase name
179is considered to be case insensitive**. So the following table assumes
180case insensitive names::
181
182 Table("mytable", metadata, Column("identifier", Integer, primary_key=True))
183
184Whereas when mixed case or UPPERCASE names are used, case sensitivity is
185assumed::
186
187 Table("MyTable", metadata, Column("Identifier", Integer, primary_key=True))
188
189A similar situation occurs at the database driver level when emitting a
190textual SQL SELECT statement and looking at column names in the DBAPI
191``cursor.description`` attribute. A database like PostgreSQL will normalize
192case insensitive names to be lowercase::
193
194 >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test")
195 >>> pg_connection = pg_engine.connect()
196 >>> result = pg_connection.exec_driver_sql("SELECT 1 AS SomeName")
197 >>> result.cursor.description
198 (Column(name='somename', type_code=23),)
199
200Whereas Oracle normalizes them to UPPERCASE::
201
202 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
203 >>> oracle_connection = oracle_engine.connect()
204 >>> result = oracle_connection.exec_driver_sql(
205 ... "SELECT 1 AS SomeName FROM DUAL"
206 ... )
207 >>> result.cursor.description
208 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
209
210In order to achieve cross-database parity for the two cases of a. table
211reflection and b. textual-only SQL statement round trips, SQLAlchemy performs a step
212called **name normalization** when using the Oracle dialect. This process may
213also apply to other third party dialects that have similar UPPERCASE handling
214of case insensitive names.
215
216When using name normalization, SQLAlchemy attempts to detect if a name is
217case insensitive by checking if all characters are UPPERCASE letters only;
218if so, then it assumes this is a case insensitive name and is delivered as
219a lowercase name.
220
221For table reflection, a tablename that is seen represented as all UPPERCASE
222in Oracle Database's catalog tables will be assumed to have a case insensitive
223name. This is what allows the ``Table`` definition to use lower case names
224and be equally compatible from a reflection point of view on Oracle Database
225and all other databases such as PostgreSQL and MySQL::
226
227 # matches a table created with CREATE TABLE mytable
228 Table("mytable", metadata, autoload_with=some_engine)
229
230Above, the all lowercase name ``"mytable"`` is case insensitive; it will match
231a table reported by PostgreSQL as ``"mytable"`` and a table reported by
232Oracle as ``"MYTABLE"``. If name normalization were not present, it would
233not be possible for the above :class:`.Table` definition to be introspectable
234in a cross-database way, since we are dealing with a case insensitive name
235that is not reported by each database in the same way.
236
237Case sensitivity can be forced on in this case, such as if we wanted to represent
238the quoted tablename ``"MYTABLE"`` with that exact casing, most simply by using
239that casing directly, which will be seen as a case sensitive name::
240
241 # matches a table created with CREATE TABLE "MYTABLE"
242 Table("MYTABLE", metadata, autoload_with=some_engine)
243
244For the unusual case of a quoted all-lowercase name, the :class:`.quoted_name`
245construct may be used::
246
247 from sqlalchemy import quoted_name
248
249 # matches a table created with CREATE TABLE "mytable"
250 Table(
251 quoted_name("mytable", quote=True), metadata, autoload_with=some_engine
252 )
253
254Name normalization also takes place when handling result sets from **purely
255textual SQL strings**, that have no other :class:`.Table` or :class:`.Column`
256metadata associated with them. This includes SQL strings executed using
257:meth:`.Connection.exec_driver_sql` and SQL strings executed using the
258:func:`.text` construct which do not include :class:`.Column` metadata.
259
260Returning to the Oracle Database SELECT statement, we see that even though
261``cursor.description`` reports the column name as ``SOMENAME``, SQLAlchemy
262name normalizes this to ``somename``::
263
264 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
265 >>> oracle_connection = oracle_engine.connect()
266 >>> result = oracle_connection.exec_driver_sql(
267 ... "SELECT 1 AS SomeName FROM DUAL"
268 ... )
269 >>> result.cursor.description
270 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
271 >>> result.keys()
272 RMKeyView(['somename'])
273
274The single scenario where the above behavior produces inaccurate results
275is when using an all-uppercase, quoted name. SQLAlchemy has no way to determine
276that a particular name in ``cursor.description`` was quoted, and is therefore
277case sensitive, or was not quoted, and should be name normalized::
278
279 >>> result = oracle_connection.exec_driver_sql(
280 ... 'SELECT 1 AS "SOMENAME" FROM DUAL'
281 ... )
282 >>> result.cursor.description
283 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
284 >>> result.keys()
285 RMKeyView(['somename'])
286
287For this case, a new feature will be available in SQLAlchemy 2.1 to disable
288the name normalization behavior in specific cases.
289
290
291.. _oracle_max_identifier_lengths:
292
293Maximum Identifier Lengths
294--------------------------
295
296SQLAlchemy is sensitive to the maximum identifier length supported by Oracle
297Database. This affects generated SQL label names as well as the generation of
298constraint names, particularly in the case where the constraint naming
299convention feature described at :ref:`constraint_naming_conventions` is being
300used.
301
302Oracle Database 12.2 increased the default maximum identifier length from 30 to
303128. As of SQLAlchemy 1.4, the default maximum identifier length for the Oracle
304dialects is 128 characters. Upon first connection, the maximum length actually
305supported by the database is obtained. In all cases, setting the
306:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
307change and the value given will be used as is::
308
309 engine = create_engine(
310 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1",
311 max_identifier_length=30,
312 )
313
314If :paramref:`_sa.create_engine.max_identifier_length` is not set, the oracledb
315dialect internally uses the ``max_identifier_length`` attribute available on
316driver connections since python-oracledb version 2.5. When using an older
317driver version, or using the cx_Oracle dialect, SQLAlchemy will instead attempt
318to use the query ``SELECT value FROM v$parameter WHERE name = 'compatible'``
319upon first connect in order to determine the effective compatibility version of
320the database. The "compatibility" version is a version number that is
321independent of the actual database version. It is used to assist database
322migration. It is configured by an Oracle Database initialization parameter. The
323compatibility version then determines the maximum allowed identifier length for
324the database. If the V$ view is not available, the database version information
325is used instead.
326
327The maximum identifier length comes into play both when generating anonymized
328SQL labels in SELECT statements, but more crucially when generating constraint
329names from a naming convention. It is this area that has created the need for
330SQLAlchemy to change this default conservatively. For example, the following
331naming convention produces two very different constraint names based on the
332identifier length::
333
334 from sqlalchemy import Column
335 from sqlalchemy import Index
336 from sqlalchemy import Integer
337 from sqlalchemy import MetaData
338 from sqlalchemy import Table
339 from sqlalchemy.dialects import oracle
340 from sqlalchemy.schema import CreateIndex
341
342 m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
343
344 t = Table(
345 "t",
346 m,
347 Column("some_column_name_1", Integer),
348 Column("some_column_name_2", Integer),
349 Column("some_column_name_3", Integer),
350 )
351
352 ix = Index(
353 None,
354 t.c.some_column_name_1,
355 t.c.some_column_name_2,
356 t.c.some_column_name_3,
357 )
358
359 oracle_dialect = oracle.dialect(max_identifier_length=30)
360 print(CreateIndex(ix).compile(dialect=oracle_dialect))
361
362With an identifier length of 30, the above CREATE INDEX looks like:
363
364.. sourcecode:: sql
365
366 CREATE INDEX ix_some_column_name_1s_70cd ON t
367 (some_column_name_1, some_column_name_2, some_column_name_3)
368
369However with length of 128, it becomes::
370
371.. sourcecode:: sql
372
373 CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
374 (some_column_name_1, some_column_name_2, some_column_name_3)
375
376Applications which have run versions of SQLAlchemy prior to 1.4 on Oracle
377Database version 12.2 or greater are therefore subject to the scenario of a
378database migration that wishes to "DROP CONSTRAINT" on a name that was
379previously generated with the shorter length. This migration will fail when
380the identifier length is changed without the name of the index or constraint
381first being adjusted. Such applications are strongly advised to make use of
382:paramref:`_sa.create_engine.max_identifier_length` in order to maintain
383control of the generation of truncated names, and to fully review and test all
384database migrations in a staging environment when changing this value to ensure
385that the impact of this change has been mitigated.
386
387.. versionchanged:: 1.4 the default max_identifier_length for Oracle Database
388 is 128 characters, which is adjusted down to 30 upon first connect if the
389 Oracle Database, or its compatibility setting, are lower than version 12.2.
390
391
392LIMIT/OFFSET/FETCH Support
393--------------------------
394
395Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make use
396of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming Oracle Database 12c or
397above, and assuming the SELECT statement is not embedded within a compound
398statement like UNION. This syntax is also available directly by using the
399:meth:`_sql.Select.fetch` method.
400
401.. versionchanged:: 2.0 the Oracle Database dialects now use ``FETCH FIRST N
402 ROW / OFFSET N ROWS`` for all :meth:`_sql.Select.limit` and
403 :meth:`_sql.Select.offset` usage including within the ORM and legacy
404 :class:`_orm.Query`. To force the legacy behavior using window functions,
405 specify the ``enable_offset_fetch=False`` dialect parameter to
406 :func:`_sa.create_engine`.
407
408The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle Database
409version by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`,
410which will force the use of "legacy" mode that makes use of window functions.
411This mode is also selected automatically when using a version of Oracle
412Database prior to 12c.
413
414When using legacy mode, or when a :class:`.Select` statement with limit/offset
415is embedded in a compound statement, an emulated approach for LIMIT / OFFSET
416based on window functions is used, which involves creation of a subquery using
417``ROW_NUMBER`` that is prone to performance issues as well as SQL construction
418issues for complex statements. However, this approach is supported by all
419Oracle Database versions. See notes below.
420
421Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
422~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
423
424If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the
425ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an
426Oracle Database version prior to 12c, the following notes apply:
427
428* SQLAlchemy currently makes use of ROWNUM to achieve
429 LIMIT/OFFSET; the exact methodology is taken from
430 https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
431
432* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
433 the usage of this optimization directive, specify ``optimize_limits=True``
434 to :func:`_sa.create_engine`.
435
436 .. versionchanged:: 1.4
437
438 The Oracle Database dialect renders limit/offset integer values using a
439 "post compile" scheme which renders the integer directly before passing
440 the statement to the cursor for execution. The ``use_binds_for_limits``
441 flag no longer has an effect.
442
443 .. seealso::
444
445 :ref:`change_4808`.
446
447.. _oracle_returning:
448
449RETURNING Support
450-----------------
451
452Oracle Database supports RETURNING fully for INSERT, UPDATE and DELETE
453statements that are invoked with a single collection of bound parameters (that
454is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally
455support RETURNING with :term:`executemany` statements). Multiple rows may be
456returned as well.
457
458.. versionchanged:: 2.0 the Oracle Database backend has full support for
459 RETURNING on parity with other backends.
460
461
462ON UPDATE CASCADE
463-----------------
464
465Oracle Database doesn't have native ON UPDATE CASCADE functionality. A trigger
466based solution is available at
467https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
468
469When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
470cascading updates - specify ForeignKey objects using the
471"deferrable=True, initially='deferred'" keyword arguments,
472and specify "passive_updates=False" on each relationship().
473
474Oracle Database 8 Compatibility
475-------------------------------
476
477.. warning:: The status of Oracle Database 8 compatibility is not known for
478 SQLAlchemy 2.0.
479
480When Oracle Database 8 is detected, the dialect internally configures itself to
481the following behaviors:
482
483* the use_ansi flag is set to False. This has the effect of converting all
484 JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
485 makes use of Oracle's (+) operator.
486
487* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
488 the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued
489 instead. This because these types don't seem to work correctly on Oracle 8
490 even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and
491 :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
492 NVARCHAR2 and NCLOB.
493
494
495Synonym/DBLINK Reflection
496-------------------------
497
498When using reflection with Table objects, the dialect can optionally search
499for tables indicated by synonyms, either in local or remote schemas or
500accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
501a keyword argument to the :class:`_schema.Table` construct::
502
503 some_table = Table(
504 "some_table", autoload_with=some_engine, oracle_resolve_synonyms=True
505 )
506
507When this flag is set, the given name (such as ``some_table`` above) will be
508searched not just in the ``ALL_TABLES`` view, but also within the
509``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
510name. If the synonym is located and refers to a DBLINK, the Oracle Database
511dialects know how to locate the table's information using DBLINK syntax(e.g.
512``@dblink``).
513
514``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
515accepted, including methods such as :meth:`_schema.MetaData.reflect` and
516:meth:`_reflection.Inspector.get_columns`.
517
518If synonyms are not in use, this flag should be left disabled.
519
520.. _oracle_constraint_reflection:
521
522Constraint Reflection
523---------------------
524
525The Oracle Database dialects can return information about foreign key, unique,
526and CHECK constraints, as well as indexes on tables.
527
528Raw information regarding these constraints can be acquired using
529:meth:`_reflection.Inspector.get_foreign_keys`,
530:meth:`_reflection.Inspector.get_unique_constraints`,
531:meth:`_reflection.Inspector.get_check_constraints`, and
532:meth:`_reflection.Inspector.get_indexes`.
533
534.. versionchanged:: 1.2 The Oracle Database dialect can now reflect UNIQUE and
535 CHECK constraints.
536
537When using reflection at the :class:`_schema.Table` level, the
538:class:`_schema.Table`
539will also include these constraints.
540
541Note the following caveats:
542
543* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
544 Oracle Database builds a special "IS NOT NULL" constraint for columns that
545 specify "NOT NULL". This constraint is **not** returned by default; to
546 include the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
547
548 from sqlalchemy import create_engine, inspect
549
550 engine = create_engine(
551 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1"
552 )
553 inspector = inspect(engine)
554 all_check_constraints = inspector.get_check_constraints(
555 "some_table", include_all=True
556 )
557
558* in most cases, when reflecting a :class:`_schema.Table`, a UNIQUE constraint
559 will **not** be available as a :class:`.UniqueConstraint` object, as Oracle
560 Database mirrors unique constraints with a UNIQUE index in most cases (the
561 exception seems to be when two or more unique constraints represent the same
562 columns); the :class:`_schema.Table` will instead represent these using
563 :class:`.Index` with the ``unique=True`` flag set.
564
565* Oracle Database creates an implicit index for the primary key of a table;
566 this index is **excluded** from all index results.
567
568* the list of columns reflected for an index will not include column names
569 that start with SYS_NC.
570
571Table names with SYSTEM/SYSAUX tablespaces
572-------------------------------------------
573
574The :meth:`_reflection.Inspector.get_table_names` and
575:meth:`_reflection.Inspector.get_temp_table_names`
576methods each return a list of table names for the current engine. These methods
577are also part of the reflection which occurs within an operation such as
578:meth:`_schema.MetaData.reflect`. By default,
579these operations exclude the ``SYSTEM``
580and ``SYSAUX`` tablespaces from the operation. In order to change this, the
581default list of tablespaces excluded can be changed at the engine level using
582the ``exclude_tablespaces`` parameter::
583
584 # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
585 e = create_engine(
586 "oracle+oracledb://scott:tiger@localhost:1521/?service_name=freepdb1",
587 exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"],
588 )
589
590.. _oracle_float_support:
591
592FLOAT / DOUBLE Support and Behaviors
593------------------------------------
594
595The SQLAlchemy :class:`.Float` and :class:`.Double` datatypes are generic
596datatypes that resolve to the "least surprising" datatype for a given backend.
597For Oracle Database, this means they resolve to the ``FLOAT`` and ``DOUBLE``
598types::
599
600 >>> from sqlalchemy import cast, literal, Float
601 >>> from sqlalchemy.dialects import oracle
602 >>> float_datatype = Float()
603 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
604 CAST(:param_1 AS FLOAT)
605
606Oracle's ``FLOAT`` / ``DOUBLE`` datatypes are aliases for ``NUMBER``. Oracle
607Database stores ``NUMBER`` values with full precision, not floating point
608precision, which means that ``FLOAT`` / ``DOUBLE`` do not actually behave like
609native FP values. Oracle Database instead offers special datatypes
610``BINARY_FLOAT`` and ``BINARY_DOUBLE`` to deliver real 4- and 8- byte FP
611values.
612
613SQLAlchemy supports these datatypes directly using :class:`.BINARY_FLOAT` and
614:class:`.BINARY_DOUBLE`. To use the :class:`.Float` or :class:`.Double`
615datatypes in a database agnostic way, while allowing Oracle backends to utilize
616one of these types, use the :meth:`.TypeEngine.with_variant` method to set up a
617variant::
618
619 >>> from sqlalchemy import cast, literal, Float
620 >>> from sqlalchemy.dialects import oracle
621 >>> float_datatype = Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
622 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
623 CAST(:param_1 AS BINARY_FLOAT)
624
625E.g. to use this datatype in a :class:`.Table` definition::
626
627 my_table = Table(
628 "my_table",
629 metadata,
630 Column(
631 "fp_data", Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
632 ),
633 )
634
635DateTime Compatibility
636----------------------
637
638Oracle Database has no datatype known as ``DATETIME``, it instead has only
639``DATE``, which can actually store a date and time value. For this reason, the
640Oracle Database dialects provide a type :class:`_oracle.DATE` which is a
641subclass of :class:`.DateTime`. This type has no special behavior, and is only
642present as a "marker" for this type; additionally, when a database column is
643reflected and the type is reported as ``DATE``, the time-supporting
644:class:`_oracle.DATE` type is used.
645
646.. _oracle_table_options:
647
648Oracle Database Table Options
649-----------------------------
650
651The CREATE TABLE phrase supports the following options with Oracle Database
652dialects in conjunction with the :class:`_schema.Table` construct:
653
654
655* ``ON COMMIT``::
656
657 Table(
658 "some_table",
659 metadata,
660 ...,
661 prefixes=["GLOBAL TEMPORARY"],
662 oracle_on_commit="PRESERVE ROWS",
663 )
664
665*
666 ``COMPRESS``::
667
668 Table(
669 "mytable", metadata, Column("data", String(32)), oracle_compress=True
670 )
671
672 Table("mytable", metadata, Column("data", String(32)), oracle_compress=6)
673
674 The ``oracle_compress`` parameter accepts either an integer compression
675 level, or ``True`` to use the default compression level.
676
677*
678 ``TABLESPACE``::
679
680 Table("mytable", metadata, ..., oracle_tablespace="EXAMPLE_TABLESPACE")
681
682 The ``oracle_tablespace`` parameter specifies the tablespace in which the
683 table is to be created. This is useful when you want to create a table in a
684 tablespace other than the default tablespace of the user.
685
686 .. versionadded:: 2.0.37
687
688.. _oracle_index_options:
689
690Oracle Database Specific Index Options
691--------------------------------------
692
693Bitmap Indexes
694~~~~~~~~~~~~~~
695
696You can specify the ``oracle_bitmap`` parameter to create a bitmap index
697instead of a B-tree index::
698
699 Index("my_index", my_table.c.data, oracle_bitmap=True)
700
701Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
702check for such limitations, only the database will.
703
704Index compression
705~~~~~~~~~~~~~~~~~
706
707Oracle Database has a more efficient storage mode for indexes containing lots
708of repeated values. Use the ``oracle_compress`` parameter to turn on key
709compression::
710
711 Index("my_index", my_table.c.data, oracle_compress=True)
712
713 Index(
714 "my_index",
715 my_table.c.data1,
716 my_table.c.data2,
717 unique=True,
718 oracle_compress=1,
719 )
720
721The ``oracle_compress`` parameter accepts either an integer specifying the
722number of prefix columns to compress, or ``True`` to use the default (all
723columns for non-unique indexes, all but the last column for unique indexes).
724
725.. _oracle_vector_datatype:
726
727VECTOR Datatype
728---------------
729
730Oracle Database 23ai introduced a new VECTOR datatype for artificial intelligence
731and machine learning search operations. The VECTOR datatype is a homogeneous array
732of 8-bit signed integers, 8-bit unsigned integers (binary), 32-bit floating-point numbers,
733or 64-bit floating-point numbers.
734
735.. seealso::
736
737 `Using VECTOR Data
738 <https://python-oracledb.readthedocs.io/en/latest/user_guide/vector_data_type.html>`_ - in the documentation
739 for the :ref:`oracledb` driver.
740
741.. versionadded:: 2.0.41
742
743CREATE TABLE support for VECTOR
744~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
745
746With the :class:`.VECTOR` datatype, you can specify the dimension for the data
747and the storage format. Valid values for storage format are enum values from
748:class:`.VectorStorageFormat`. To create a table that includes a
749:class:`.VECTOR` column::
750
751 from sqlalchemy.dialects.oracle import VECTOR, VectorStorageFormat
752
753 t = Table(
754 "t1",
755 metadata,
756 Column("id", Integer, primary_key=True),
757 Column(
758 "embedding",
759 VECTOR(dim=3, storage_format=VectorStorageFormat.FLOAT32),
760 ),
761 Column(...),
762 ...,
763 )
764
765Vectors can also be defined with an arbitrary number of dimensions and formats.
766This allows you to specify vectors of different dimensions with the various
767storage formats mentioned above.
768
769**Examples**
770
771* In this case, the storage format is flexible, allowing any vector type data to be inserted,
772 such as INT8 or BINARY etc::
773
774 vector_col: Mapped[array.array] = mapped_column(VECTOR(dim=3))
775
776* The dimension is flexible in this case, meaning that any dimension vector can be used::
777
778 vector_col: Mapped[array.array] = mapped_column(
779 VECTOR(storage_format=VectorStorageType.INT8)
780 )
781
782* Both the dimensions and the storage format are flexible::
783
784 vector_col: Mapped[array.array] = mapped_column(VECTOR)
785
786Python Datatypes for VECTOR
787~~~~~~~~~~~~~~~~~~~~~~~~~~~
788
789VECTOR data can be inserted using Python list or Python ``array.array()`` objects.
790Python arrays of type FLOAT (32-bit), DOUBLE (64-bit), or INT (8-bit signed integer)
791are used as bind values when inserting VECTOR columns::
792
793 from sqlalchemy import insert, select
794
795 with engine.begin() as conn:
796 conn.execute(
797 insert(t1),
798 {"id": 1, "embedding": [1, 2, 3]},
799 )
800
801VECTOR Indexes
802~~~~~~~~~~~~~~
803
804The VECTOR feature supports an Oracle-specific parameter ``oracle_vector``
805on the :class:`.Index` construct, which allows the construction of VECTOR
806indexes.
807
808To utilize VECTOR indexing, set the ``oracle_vector`` parameter to True to use
809the default values provided by Oracle. HNSW is the default indexing method::
810
811 from sqlalchemy import Index
812
813 Index(
814 "vector_index",
815 t1.c.embedding,
816 oracle_vector=True,
817 )
818
819The full range of parameters for vector indexes are available by using the
820:class:`.VectorIndexConfig` dataclass in place of a boolean; this dataclass
821allows full configuration of the index::
822
823 Index(
824 "hnsw_vector_index",
825 t1.c.embedding,
826 oracle_vector=VectorIndexConfig(
827 index_type=VectorIndexType.HNSW,
828 distance=VectorDistanceType.COSINE,
829 accuracy=90,
830 hnsw_neighbors=5,
831 hnsw_efconstruction=20,
832 parallel=10,
833 ),
834 )
835
836 Index(
837 "ivf_vector_index",
838 t1.c.embedding,
839 oracle_vector=VectorIndexConfig(
840 index_type=VectorIndexType.IVF,
841 distance=VectorDistanceType.DOT,
842 accuracy=90,
843 ivf_neighbor_partitions=5,
844 ),
845 )
846
847For complete explanation of these parameters, see the Oracle documentation linked
848below.
849
850.. seealso::
851
852 `CREATE VECTOR INDEX <https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B396C369-54BB-4098-A0DD-7C54B3A0D66F>`_ - in the Oracle documentation
853
854
855
856Similarity Searching
857~~~~~~~~~~~~~~~~~~~~
858
859When using the :class:`_oracle.VECTOR` datatype with a :class:`.Column` or similar
860ORM mapped construct, additional comparison functions are available, including:
861
862* ``l2_distance``
863* ``cosine_distance``
864* ``inner_product``
865
866Example Usage::
867
868 result_vector = connection.scalars(
869 select(t1).order_by(t1.embedding.l2_distance([2, 3, 4])).limit(3)
870 )
871
872 for user in vector:
873 print(user.id, user.embedding)
874
875FETCH APPROXIMATE support
876~~~~~~~~~~~~~~~~~~~~~~~~~
877
878Approximate vector search can only be performed when all syntax and semantic
879rules are satisfied, the corresponding vector index is available, and the
880query optimizer determines to perform it. If any of these conditions are
881unmet, then an approximate search is not performed. In this case the query
882returns exact results.
883
884To enable approximate searching during similarity searches on VECTORS, the
885``oracle_fetch_approximate`` parameter may be used with the :meth:`.Select.fetch`
886clause to add ``FETCH APPROX`` to the SELECT statement::
887
888 select(users_table).fetch(5, oracle_fetch_approximate=True)
889
890""" # noqa
891
892from __future__ import annotations
893
894from collections import defaultdict
895from dataclasses import fields
896from functools import lru_cache
897from functools import wraps
898import re
899
900from . import dictionary
901from .types import _OracleBoolean
902from .types import _OracleDate
903from .types import BFILE
904from .types import BINARY_DOUBLE
905from .types import BINARY_FLOAT
906from .types import DATE
907from .types import FLOAT
908from .types import INTERVAL
909from .types import LONG
910from .types import NCLOB
911from .types import NUMBER
912from .types import NVARCHAR2 # noqa
913from .types import OracleRaw # noqa
914from .types import RAW
915from .types import ROWID # noqa
916from .types import TIMESTAMP
917from .types import VARCHAR2 # noqa
918from .vector import VECTOR
919from .vector import VectorIndexConfig
920from .vector import VectorIndexType
921from ... import Computed
922from ... import exc
923from ... import schema as sa_schema
924from ... import sql
925from ... import util
926from ...engine import default
927from ...engine import ObjectKind
928from ...engine import ObjectScope
929from ...engine import reflection
930from ...engine.reflection import ReflectionDefaults
931from ...sql import and_
932from ...sql import bindparam
933from ...sql import compiler
934from ...sql import expression
935from ...sql import func
936from ...sql import null
937from ...sql import or_
938from ...sql import select
939from ...sql import selectable as sa_selectable
940from ...sql import sqltypes
941from ...sql import util as sql_util
942from ...sql import visitors
943from ...sql.visitors import InternalTraversal
944from ...types import BLOB
945from ...types import CHAR
946from ...types import CLOB
947from ...types import DOUBLE_PRECISION
948from ...types import INTEGER
949from ...types import NCHAR
950from ...types import NVARCHAR
951from ...types import REAL
952from ...types import VARCHAR
953
954RESERVED_WORDS = set(
955 "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
956 "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
957 "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
958 "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
959 "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
960 "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
961 "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
962 "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
963 "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
964)
965
966NO_ARG_FNS = set(
967 "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split()
968)
969
970
971colspecs = {
972 sqltypes.Boolean: _OracleBoolean,
973 sqltypes.Interval: INTERVAL,
974 sqltypes.DateTime: DATE,
975 sqltypes.Date: _OracleDate,
976}
977
978ischema_names = {
979 "VARCHAR2": VARCHAR,
980 "NVARCHAR2": NVARCHAR,
981 "CHAR": CHAR,
982 "NCHAR": NCHAR,
983 "DATE": DATE,
984 "NUMBER": NUMBER,
985 "BLOB": BLOB,
986 "BFILE": BFILE,
987 "CLOB": CLOB,
988 "NCLOB": NCLOB,
989 "TIMESTAMP": TIMESTAMP,
990 "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
991 "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP,
992 "INTERVAL DAY TO SECOND": INTERVAL,
993 "RAW": RAW,
994 "FLOAT": FLOAT,
995 "DOUBLE PRECISION": DOUBLE_PRECISION,
996 "REAL": REAL,
997 "LONG": LONG,
998 "BINARY_DOUBLE": BINARY_DOUBLE,
999 "BINARY_FLOAT": BINARY_FLOAT,
1000 "ROWID": ROWID,
1001 "VECTOR": VECTOR,
1002}
1003
1004
1005class OracleTypeCompiler(compiler.GenericTypeCompiler):
1006 # Note:
1007 # Oracle DATE == DATETIME
1008 # Oracle does not allow milliseconds in DATE
1009 # Oracle does not support TIME columns
1010
1011 def visit_datetime(self, type_, **kw):
1012 return self.visit_DATE(type_, **kw)
1013
1014 def visit_float(self, type_, **kw):
1015 return self.visit_FLOAT(type_, **kw)
1016
1017 def visit_double(self, type_, **kw):
1018 return self.visit_DOUBLE_PRECISION(type_, **kw)
1019
1020 def visit_unicode(self, type_, **kw):
1021 if self.dialect._use_nchar_for_unicode:
1022 return self.visit_NVARCHAR2(type_, **kw)
1023 else:
1024 return self.visit_VARCHAR2(type_, **kw)
1025
1026 def visit_INTERVAL(self, type_, **kw):
1027 return "INTERVAL DAY%s TO SECOND%s" % (
1028 type_.day_precision is not None
1029 and "(%d)" % type_.day_precision
1030 or "",
1031 type_.second_precision is not None
1032 and "(%d)" % type_.second_precision
1033 or "",
1034 )
1035
1036 def visit_LONG(self, type_, **kw):
1037 return "LONG"
1038
1039 def visit_TIMESTAMP(self, type_, **kw):
1040 if getattr(type_, "local_timezone", False):
1041 return "TIMESTAMP WITH LOCAL TIME ZONE"
1042 elif type_.timezone:
1043 return "TIMESTAMP WITH TIME ZONE"
1044 else:
1045 return "TIMESTAMP"
1046
1047 def visit_DOUBLE_PRECISION(self, type_, **kw):
1048 return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
1049
1050 def visit_BINARY_DOUBLE(self, type_, **kw):
1051 return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
1052
1053 def visit_BINARY_FLOAT(self, type_, **kw):
1054 return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
1055
1056 def visit_FLOAT(self, type_, **kw):
1057 kw["_requires_binary_precision"] = True
1058 return self._generate_numeric(type_, "FLOAT", **kw)
1059
1060 def visit_NUMBER(self, type_, **kw):
1061 return self._generate_numeric(type_, "NUMBER", **kw)
1062
1063 def _generate_numeric(
1064 self,
1065 type_,
1066 name,
1067 precision=None,
1068 scale=None,
1069 _requires_binary_precision=False,
1070 **kw,
1071 ):
1072 if precision is None:
1073 precision = getattr(type_, "precision", None)
1074
1075 if _requires_binary_precision:
1076 binary_precision = getattr(type_, "binary_precision", None)
1077
1078 if precision and binary_precision is None:
1079 # https://www.oracletutorial.com/oracle-basics/oracle-float/
1080 estimated_binary_precision = int(precision / 0.30103)
1081 raise exc.ArgumentError(
1082 "Oracle Database FLOAT types use 'binary precision', "
1083 "which does not convert cleanly from decimal "
1084 "'precision'. Please specify "
1085 "this type with a separate Oracle Database variant, such "
1086 f"as {type_.__class__.__name__}(precision={precision})."
1087 f"with_variant(oracle.FLOAT"
1088 f"(binary_precision="
1089 f"{estimated_binary_precision}), 'oracle'), so that the "
1090 "Oracle Database specific 'binary_precision' may be "
1091 "specified accurately."
1092 )
1093 else:
1094 precision = binary_precision
1095
1096 if scale is None:
1097 scale = getattr(type_, "scale", None)
1098
1099 if precision is None:
1100 return name
1101 elif scale is None:
1102 n = "%(name)s(%(precision)s)"
1103 return n % {"name": name, "precision": precision}
1104 else:
1105 n = "%(name)s(%(precision)s, %(scale)s)"
1106 return n % {"name": name, "precision": precision, "scale": scale}
1107
1108 def visit_string(self, type_, **kw):
1109 return self.visit_VARCHAR2(type_, **kw)
1110
1111 def visit_VARCHAR2(self, type_, **kw):
1112 return self._visit_varchar(type_, "", "2")
1113
1114 def visit_NVARCHAR2(self, type_, **kw):
1115 return self._visit_varchar(type_, "N", "2")
1116
1117 visit_NVARCHAR = visit_NVARCHAR2
1118
1119 def visit_VARCHAR(self, type_, **kw):
1120 return self._visit_varchar(type_, "", "")
1121
1122 def _visit_varchar(self, type_, n, num):
1123 if not type_.length:
1124 return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
1125 elif not n and self.dialect._supports_char_length:
1126 varchar = "VARCHAR%(two)s(%(length)s CHAR)"
1127 return varchar % {"length": type_.length, "two": num}
1128 else:
1129 varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
1130 return varchar % {"length": type_.length, "two": num, "n": n}
1131
1132 def visit_text(self, type_, **kw):
1133 return self.visit_CLOB(type_, **kw)
1134
1135 def visit_unicode_text(self, type_, **kw):
1136 if self.dialect._use_nchar_for_unicode:
1137 return self.visit_NCLOB(type_, **kw)
1138 else:
1139 return self.visit_CLOB(type_, **kw)
1140
1141 def visit_large_binary(self, type_, **kw):
1142 return self.visit_BLOB(type_, **kw)
1143
1144 def visit_big_integer(self, type_, **kw):
1145 return self.visit_NUMBER(type_, precision=19, **kw)
1146
1147 def visit_boolean(self, type_, **kw):
1148 return self.visit_SMALLINT(type_, **kw)
1149
1150 def visit_RAW(self, type_, **kw):
1151 if type_.length:
1152 return "RAW(%(length)s)" % {"length": type_.length}
1153 else:
1154 return "RAW"
1155
1156 def visit_ROWID(self, type_, **kw):
1157 return "ROWID"
1158
1159 def visit_VECTOR(self, type_, **kw):
1160 if type_.dim is None and type_.storage_format is None:
1161 return "VECTOR(*,*)"
1162 elif type_.storage_format is None:
1163 return f"VECTOR({type_.dim},*)"
1164 elif type_.dim is None:
1165 return f"VECTOR(*,{type_.storage_format.value})"
1166 else:
1167 return f"VECTOR({type_.dim},{type_.storage_format.value})"
1168
1169
1170class OracleCompiler(compiler.SQLCompiler):
1171 """Oracle compiler modifies the lexical structure of Select
1172 statements to work under non-ANSI configured Oracle databases, if
1173 the use_ansi flag is False.
1174 """
1175
1176 compound_keywords = util.update_copy(
1177 compiler.SQLCompiler.compound_keywords,
1178 {expression.CompoundSelect.EXCEPT: "MINUS"},
1179 )
1180
1181 def __init__(self, *args, **kwargs):
1182 self.__wheres = {}
1183 super().__init__(*args, **kwargs)
1184
1185 def visit_mod_binary(self, binary, operator, **kw):
1186 return "mod(%s, %s)" % (
1187 self.process(binary.left, **kw),
1188 self.process(binary.right, **kw),
1189 )
1190
1191 def visit_now_func(self, fn, **kw):
1192 return "CURRENT_TIMESTAMP"
1193
1194 def visit_char_length_func(self, fn, **kw):
1195 return "LENGTH" + self.function_argspec(fn, **kw)
1196
1197 def visit_match_op_binary(self, binary, operator, **kw):
1198 return "CONTAINS (%s, %s)" % (
1199 self.process(binary.left),
1200 self.process(binary.right),
1201 )
1202
1203 def visit_true(self, expr, **kw):
1204 return "1"
1205
1206 def visit_false(self, expr, **kw):
1207 return "0"
1208
1209 def get_cte_preamble(self, recursive):
1210 return "WITH"
1211
1212 def get_select_hint_text(self, byfroms):
1213 return " ".join("/*+ %s */" % text for table, text in byfroms.items())
1214
1215 def function_argspec(self, fn, **kw):
1216 if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
1217 return compiler.SQLCompiler.function_argspec(self, fn, **kw)
1218 else:
1219 return ""
1220
1221 def visit_function(self, func, **kw):
1222 text = super().visit_function(func, **kw)
1223 if kw.get("asfrom", False) and func.name.lower() != "table":
1224 text = "TABLE (%s)" % text
1225 return text
1226
1227 def visit_table_valued_column(self, element, **kw):
1228 text = super().visit_table_valued_column(element, **kw)
1229 text = text + ".COLUMN_VALUE"
1230 return text
1231
1232 def default_from(self):
1233 """Called when a ``SELECT`` statement has no froms,
1234 and no ``FROM`` clause is to be appended.
1235
1236 The Oracle compiler tacks a "FROM DUAL" to the statement.
1237 """
1238
1239 return " FROM DUAL"
1240
1241 def visit_join(self, join, from_linter=None, **kwargs):
1242 if self.dialect.use_ansi:
1243 return compiler.SQLCompiler.visit_join(
1244 self, join, from_linter=from_linter, **kwargs
1245 )
1246 else:
1247 if from_linter:
1248 from_linter.edges.add((join.left, join.right))
1249
1250 kwargs["asfrom"] = True
1251 if isinstance(join.right, expression.FromGrouping):
1252 right = join.right.element
1253 else:
1254 right = join.right
1255 return (
1256 self.process(join.left, from_linter=from_linter, **kwargs)
1257 + ", "
1258 + self.process(right, from_linter=from_linter, **kwargs)
1259 )
1260
1261 def _get_nonansi_join_whereclause(self, froms):
1262 clauses = []
1263
1264 def visit_join(join):
1265 if join.isouter:
1266 # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
1267 # "apply the outer join operator (+) to all columns of B in
1268 # the join condition in the WHERE clause" - that is,
1269 # unconditionally regardless of operator or the other side
1270 def visit_binary(binary):
1271 if isinstance(
1272 binary.left, expression.ColumnClause
1273 ) and join.right.is_derived_from(binary.left.table):
1274 binary.left = _OuterJoinColumn(binary.left)
1275 elif isinstance(
1276 binary.right, expression.ColumnClause
1277 ) and join.right.is_derived_from(binary.right.table):
1278 binary.right = _OuterJoinColumn(binary.right)
1279
1280 clauses.append(
1281 visitors.cloned_traverse(
1282 join.onclause, {}, {"binary": visit_binary}
1283 )
1284 )
1285 else:
1286 clauses.append(join.onclause)
1287
1288 for j in join.left, join.right:
1289 if isinstance(j, expression.Join):
1290 visit_join(j)
1291 elif isinstance(j, expression.FromGrouping):
1292 visit_join(j.element)
1293
1294 for f in froms:
1295 if isinstance(f, expression.Join):
1296 visit_join(f)
1297
1298 if not clauses:
1299 return None
1300 else:
1301 return sql.and_(*clauses)
1302
1303 def visit_outer_join_column(self, vc, **kw):
1304 return self.process(vc.column, **kw) + "(+)"
1305
1306 def visit_sequence(self, seq, **kw):
1307 return self.preparer.format_sequence(seq) + ".nextval"
1308
1309 def get_render_as_alias_suffix(self, alias_name_text):
1310 """Oracle doesn't like ``FROM table AS alias``"""
1311
1312 return " " + alias_name_text
1313
1314 def returning_clause(
1315 self, stmt, returning_cols, *, populate_result_map, **kw
1316 ):
1317 columns = []
1318 binds = []
1319
1320 for i, column in enumerate(
1321 expression._select_iterables(returning_cols)
1322 ):
1323 if (
1324 self.isupdate
1325 and isinstance(column, sa_schema.Column)
1326 and isinstance(column.server_default, Computed)
1327 and not self.dialect._supports_update_returning_computed_cols
1328 ):
1329 util.warn(
1330 "Computed columns don't work with Oracle Database UPDATE "
1331 "statements that use RETURNING; the value of the column "
1332 "*before* the UPDATE takes place is returned. It is "
1333 "advised to not use RETURNING with an Oracle Database "
1334 "computed column. Consider setting implicit_returning "
1335 "to False on the Table object in order to avoid implicit "
1336 "RETURNING clauses from being generated for this Table."
1337 )
1338 if column.type._has_column_expression:
1339 col_expr = column.type.column_expression(column)
1340 else:
1341 col_expr = column
1342
1343 outparam = sql.outparam("ret_%d" % i, type_=column.type)
1344 self.binds[outparam.key] = outparam
1345 binds.append(
1346 self.bindparam_string(self._truncate_bindparam(outparam))
1347 )
1348
1349 # has_out_parameters would in a normal case be set to True
1350 # as a result of the compiler visiting an outparam() object.
1351 # in this case, the above outparam() objects are not being
1352 # visited. Ensure the statement itself didn't have other
1353 # outparam() objects independently.
1354 # technically, this could be supported, but as it would be
1355 # a very strange use case without a clear rationale, disallow it
1356 if self.has_out_parameters:
1357 raise exc.InvalidRequestError(
1358 "Using explicit outparam() objects with "
1359 "UpdateBase.returning() in the same Core DML statement "
1360 "is not supported in the Oracle Database dialects."
1361 )
1362
1363 self._oracle_returning = True
1364
1365 columns.append(self.process(col_expr, within_columns_clause=False))
1366 if populate_result_map:
1367 self._add_to_result_map(
1368 getattr(col_expr, "name", col_expr._anon_name_label),
1369 getattr(col_expr, "name", col_expr._anon_name_label),
1370 (
1371 column,
1372 getattr(column, "name", None),
1373 getattr(column, "key", None),
1374 ),
1375 column.type,
1376 )
1377
1378 return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
1379
1380 def _row_limit_clause(self, select, **kw):
1381 """Oracle Database 12c supports OFFSET/FETCH operators
1382 Use it instead subquery with row_number
1383
1384 """
1385
1386 if (
1387 select._fetch_clause is not None
1388 or not self.dialect._supports_offset_fetch
1389 ):
1390 return super()._row_limit_clause(
1391 select, use_literal_execute_for_simple_int=True, **kw
1392 )
1393 else:
1394 return self.fetch_clause(
1395 select,
1396 fetch_clause=self._get_limit_or_fetch(select),
1397 use_literal_execute_for_simple_int=True,
1398 **kw,
1399 )
1400
1401 def _get_limit_or_fetch(self, select):
1402 if select._fetch_clause is None:
1403 return select._limit_clause
1404 else:
1405 return select._fetch_clause
1406
1407 def fetch_clause(
1408 self,
1409 select,
1410 fetch_clause=None,
1411 require_offset=False,
1412 use_literal_execute_for_simple_int=False,
1413 **kw,
1414 ):
1415 text = super().fetch_clause(
1416 select,
1417 fetch_clause=fetch_clause,
1418 require_offset=require_offset,
1419 use_literal_execute_for_simple_int=(
1420 use_literal_execute_for_simple_int
1421 ),
1422 **kw,
1423 )
1424
1425 if select.dialect_options["oracle"]["fetch_approximate"]:
1426 text = re.sub("FETCH FIRST", "FETCH APPROX FIRST", text)
1427
1428 return text
1429
1430 def translate_select_structure(self, select_stmt, **kwargs):
1431 select = select_stmt
1432
1433 if not getattr(select, "_oracle_visit", None):
1434 if not self.dialect.use_ansi:
1435 froms = self._display_froms_for_select(
1436 select, kwargs.get("asfrom", False)
1437 )
1438 whereclause = self._get_nonansi_join_whereclause(froms)
1439 if whereclause is not None:
1440 select = select.where(whereclause)
1441 select._oracle_visit = True
1442
1443 # if fetch is used this is not needed
1444 if (
1445 select._has_row_limiting_clause
1446 and not self.dialect._supports_offset_fetch
1447 and select._fetch_clause is None
1448 ):
1449 limit_clause = select._limit_clause
1450 offset_clause = select._offset_clause
1451
1452 if select._simple_int_clause(limit_clause):
1453 limit_clause = limit_clause.render_literal_execute()
1454
1455 if select._simple_int_clause(offset_clause):
1456 offset_clause = offset_clause.render_literal_execute()
1457
1458 # currently using form at:
1459 # https://blogs.oracle.com/oraclemagazine/\
1460 # on-rownum-and-limiting-results
1461
1462 orig_select = select
1463 select = select._generate()
1464 select._oracle_visit = True
1465
1466 # add expressions to accommodate FOR UPDATE OF
1467 for_update = select._for_update_arg
1468 if for_update is not None and for_update.of:
1469 for_update = for_update._clone()
1470 for_update._copy_internals()
1471
1472 for elem in for_update.of:
1473 if not select.selected_columns.contains_column(elem):
1474 select = select.add_columns(elem)
1475
1476 # Wrap the middle select and add the hint
1477 inner_subquery = select.alias()
1478 limitselect = sql.select(
1479 *[
1480 c
1481 for c in inner_subquery.c
1482 if orig_select.selected_columns.corresponding_column(c)
1483 is not None
1484 ]
1485 )
1486
1487 if (
1488 limit_clause is not None
1489 and self.dialect.optimize_limits
1490 and select._simple_int_clause(limit_clause)
1491 ):
1492 limitselect = limitselect.prefix_with(
1493 expression.text(
1494 "/*+ FIRST_ROWS(%s) */"
1495 % self.process(limit_clause, **kwargs)
1496 )
1497 )
1498
1499 limitselect._oracle_visit = True
1500 limitselect._is_wrapper = True
1501
1502 # add expressions to accommodate FOR UPDATE OF
1503 if for_update is not None and for_update.of:
1504 adapter = sql_util.ClauseAdapter(inner_subquery)
1505 for_update.of = [
1506 adapter.traverse(elem) for elem in for_update.of
1507 ]
1508
1509 # If needed, add the limiting clause
1510 if limit_clause is not None:
1511 if select._simple_int_clause(limit_clause) and (
1512 offset_clause is None
1513 or select._simple_int_clause(offset_clause)
1514 ):
1515 max_row = limit_clause
1516
1517 if offset_clause is not None:
1518 max_row = max_row + offset_clause
1519
1520 else:
1521 max_row = limit_clause
1522
1523 if offset_clause is not None:
1524 max_row = max_row + offset_clause
1525 limitselect = limitselect.where(
1526 sql.literal_column("ROWNUM") <= max_row
1527 )
1528
1529 # If needed, add the ora_rn, and wrap again with offset.
1530 if offset_clause is None:
1531 limitselect._for_update_arg = for_update
1532 select = limitselect
1533 else:
1534 limitselect = limitselect.add_columns(
1535 sql.literal_column("ROWNUM").label("ora_rn")
1536 )
1537 limitselect._oracle_visit = True
1538 limitselect._is_wrapper = True
1539
1540 if for_update is not None and for_update.of:
1541 limitselect_cols = limitselect.selected_columns
1542 for elem in for_update.of:
1543 if (
1544 limitselect_cols.corresponding_column(elem)
1545 is None
1546 ):
1547 limitselect = limitselect.add_columns(elem)
1548
1549 limit_subquery = limitselect.alias()
1550 origselect_cols = orig_select.selected_columns
1551 offsetselect = sql.select(
1552 *[
1553 c
1554 for c in limit_subquery.c
1555 if origselect_cols.corresponding_column(c)
1556 is not None
1557 ]
1558 )
1559
1560 offsetselect._oracle_visit = True
1561 offsetselect._is_wrapper = True
1562
1563 if for_update is not None and for_update.of:
1564 adapter = sql_util.ClauseAdapter(limit_subquery)
1565 for_update.of = [
1566 adapter.traverse(elem) for elem in for_update.of
1567 ]
1568
1569 offsetselect = offsetselect.where(
1570 sql.literal_column("ora_rn") > offset_clause
1571 )
1572
1573 offsetselect._for_update_arg = for_update
1574 select = offsetselect
1575
1576 return select
1577
1578 def limit_clause(self, select, **kw):
1579 return ""
1580
1581 def visit_empty_set_expr(self, type_, **kw):
1582 return "SELECT 1 FROM DUAL WHERE 1!=1"
1583
1584 def for_update_clause(self, select, **kw):
1585 if self.is_subquery():
1586 return ""
1587
1588 tmp = " FOR UPDATE"
1589
1590 if select._for_update_arg.of:
1591 tmp += " OF " + ", ".join(
1592 self.process(elem, **kw) for elem in select._for_update_arg.of
1593 )
1594
1595 if select._for_update_arg.nowait:
1596 tmp += " NOWAIT"
1597 if select._for_update_arg.skip_locked:
1598 tmp += " SKIP LOCKED"
1599
1600 return tmp
1601
1602 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1603 return "DECODE(%s, %s, 0, 1) = 1" % (
1604 self.process(binary.left),
1605 self.process(binary.right),
1606 )
1607
1608 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1609 return "DECODE(%s, %s, 0, 1) = 0" % (
1610 self.process(binary.left),
1611 self.process(binary.right),
1612 )
1613
1614 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1615 string = self.process(binary.left, **kw)
1616 pattern = self.process(binary.right, **kw)
1617 flags = binary.modifiers["flags"]
1618 if flags is None:
1619 return "REGEXP_LIKE(%s, %s)" % (string, pattern)
1620 else:
1621 return "REGEXP_LIKE(%s, %s, %s)" % (
1622 string,
1623 pattern,
1624 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1625 )
1626
1627 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1628 return "NOT %s" % self.visit_regexp_match_op_binary(
1629 binary, operator, **kw
1630 )
1631
1632 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
1633 string = self.process(binary.left, **kw)
1634 pattern_replace = self.process(binary.right, **kw)
1635 flags = binary.modifiers["flags"]
1636 if flags is None:
1637 return "REGEXP_REPLACE(%s, %s)" % (
1638 string,
1639 pattern_replace,
1640 )
1641 else:
1642 return "REGEXP_REPLACE(%s, %s, %s)" % (
1643 string,
1644 pattern_replace,
1645 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1646 )
1647
1648 def visit_aggregate_strings_func(self, fn, **kw):
1649 return "LISTAGG%s" % self.function_argspec(fn, **kw)
1650
1651 def _visit_bitwise(self, binary, fn_name, custom_right=None, **kw):
1652 left = self.process(binary.left, **kw)
1653 right = self.process(
1654 custom_right if custom_right is not None else binary.right, **kw
1655 )
1656 return f"{fn_name}({left}, {right})"
1657
1658 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1659 return self._visit_bitwise(binary, "BITXOR", **kw)
1660
1661 def visit_bitwise_or_op_binary(self, binary, operator, **kw):
1662 return self._visit_bitwise(binary, "BITOR", **kw)
1663
1664 def visit_bitwise_and_op_binary(self, binary, operator, **kw):
1665 return self._visit_bitwise(binary, "BITAND", **kw)
1666
1667 def visit_bitwise_rshift_op_binary(self, binary, operator, **kw):
1668 raise exc.CompileError("Cannot compile bitwise_rshift in oracle")
1669
1670 def visit_bitwise_lshift_op_binary(self, binary, operator, **kw):
1671 raise exc.CompileError("Cannot compile bitwise_lshift in oracle")
1672
1673 def visit_bitwise_not_op_unary_operator(self, element, operator, **kw):
1674 raise exc.CompileError("Cannot compile bitwise_not in oracle")
1675
1676
1677class OracleDDLCompiler(compiler.DDLCompiler):
1678
1679 def _build_vector_index_config(
1680 self, vector_index_config: VectorIndexConfig
1681 ) -> str:
1682 parts = []
1683 sql_param_name = {
1684 "hnsw_neighbors": "neighbors",
1685 "hnsw_efconstruction": "efconstruction",
1686 "ivf_neighbor_partitions": "neighbor partitions",
1687 "ivf_sample_per_partition": "sample_per_partition",
1688 "ivf_min_vectors_per_partition": "min_vectors_per_partition",
1689 }
1690 if vector_index_config.index_type == VectorIndexType.HNSW:
1691 parts.append("ORGANIZATION INMEMORY NEIGHBOR GRAPH")
1692 elif vector_index_config.index_type == VectorIndexType.IVF:
1693 parts.append("ORGANIZATION NEIGHBOR PARTITIONS")
1694 if vector_index_config.distance is not None:
1695 parts.append(f"DISTANCE {vector_index_config.distance.value}")
1696
1697 if vector_index_config.accuracy is not None:
1698 parts.append(
1699 f"WITH TARGET ACCURACY {vector_index_config.accuracy}"
1700 )
1701
1702 parameters_str = [f"type {vector_index_config.index_type.name}"]
1703 prefix = vector_index_config.index_type.name.lower() + "_"
1704
1705 for field in fields(vector_index_config):
1706 if field.name.startswith(prefix):
1707 key = sql_param_name.get(field.name)
1708 value = getattr(vector_index_config, field.name)
1709 if value is not None:
1710 parameters_str.append(f"{key} {value}")
1711
1712 parameters_str = ", ".join(parameters_str)
1713 parts.append(f"PARAMETERS ({parameters_str})")
1714
1715 if vector_index_config.parallel is not None:
1716 parts.append(f"PARALLEL {vector_index_config.parallel}")
1717
1718 return " ".join(parts)
1719
1720 def define_constraint_cascades(self, constraint):
1721 text = ""
1722 if constraint.ondelete is not None:
1723 text += " ON DELETE %s" % constraint.ondelete
1724
1725 # oracle has no ON UPDATE CASCADE -
1726 # its only available via triggers
1727 # https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
1728 if constraint.onupdate is not None:
1729 util.warn(
1730 "Oracle Database does not contain native UPDATE CASCADE "
1731 "functionality - onupdates will not be rendered for foreign "
1732 "keys. Consider using deferrable=True, initially='deferred' "
1733 "or triggers."
1734 )
1735
1736 return text
1737
1738 def visit_drop_table_comment(self, drop, **kw):
1739 return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
1740 drop.element
1741 )
1742
1743 def visit_create_index(self, create, **kw):
1744 index = create.element
1745 self._verify_index_table(index)
1746 preparer = self.preparer
1747 text = "CREATE "
1748 if index.unique:
1749 text += "UNIQUE "
1750 if index.dialect_options["oracle"]["bitmap"]:
1751 text += "BITMAP "
1752 vector_options = index.dialect_options["oracle"]["vector"]
1753 if vector_options:
1754 text += "VECTOR "
1755 text += "INDEX %s ON %s (%s)" % (
1756 self._prepared_index_name(index, include_schema=True),
1757 preparer.format_table(index.table, use_schema=True),
1758 ", ".join(
1759 self.sql_compiler.process(
1760 expr, include_table=False, literal_binds=True
1761 )
1762 for expr in index.expressions
1763 ),
1764 )
1765 if index.dialect_options["oracle"]["compress"] is not False:
1766 if index.dialect_options["oracle"]["compress"] is True:
1767 text += " COMPRESS"
1768 else:
1769 text += " COMPRESS %d" % (
1770 index.dialect_options["oracle"]["compress"]
1771 )
1772 if vector_options:
1773 if vector_options is True:
1774 vector_options = VectorIndexConfig()
1775
1776 text += " " + self._build_vector_index_config(vector_options)
1777 return text
1778
1779 def post_create_table(self, table):
1780 table_opts = []
1781 opts = table.dialect_options["oracle"]
1782
1783 if opts["on_commit"]:
1784 on_commit_options = opts["on_commit"].replace("_", " ").upper()
1785 table_opts.append("\n ON COMMIT %s" % on_commit_options)
1786
1787 if opts["compress"]:
1788 if opts["compress"] is True:
1789 table_opts.append("\n COMPRESS")
1790 else:
1791 table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
1792 if opts["tablespace"]:
1793 table_opts.append(
1794 "\n TABLESPACE %s" % self.preparer.quote(opts["tablespace"])
1795 )
1796 return "".join(table_opts)
1797
1798 def get_identity_options(self, identity_options):
1799 text = super().get_identity_options(identity_options)
1800 text = text.replace("NO MINVALUE", "NOMINVALUE")
1801 text = text.replace("NO MAXVALUE", "NOMAXVALUE")
1802 text = text.replace("NO CYCLE", "NOCYCLE")
1803 if identity_options.order is not None:
1804 text += " ORDER" if identity_options.order else " NOORDER"
1805 return text.strip()
1806
1807 def visit_computed_column(self, generated, **kw):
1808 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
1809 generated.sqltext, include_table=False, literal_binds=True
1810 )
1811 if generated.persisted is True:
1812 raise exc.CompileError(
1813 "Oracle Database computed columns do not support 'stored' "
1814 "persistence; set the 'persisted' flag to None or False for "
1815 "Oracle Database support."
1816 )
1817 elif generated.persisted is False:
1818 text += " VIRTUAL"
1819 return text
1820
1821 def visit_identity_column(self, identity, **kw):
1822 if identity.always is None:
1823 kind = ""
1824 else:
1825 kind = "ALWAYS" if identity.always else "BY DEFAULT"
1826 text = "GENERATED %s" % kind
1827 if identity.on_null:
1828 text += " ON NULL"
1829 text += " AS IDENTITY"
1830 options = self.get_identity_options(identity)
1831 if options:
1832 text += " (%s)" % options
1833 return text
1834
1835
1836class OracleIdentifierPreparer(compiler.IdentifierPreparer):
1837 reserved_words = {x.lower() for x in RESERVED_WORDS}
1838 illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
1839 ["_", "$"]
1840 )
1841
1842 def _bindparam_requires_quotes(self, value):
1843 """Return True if the given identifier requires quoting."""
1844 lc_value = value.lower()
1845 return (
1846 lc_value in self.reserved_words
1847 or value[0] in self.illegal_initial_characters
1848 or not self.legal_characters.match(str(value))
1849 )
1850
1851 def format_savepoint(self, savepoint):
1852 name = savepoint.ident.lstrip("_")
1853 return super().format_savepoint(savepoint, name)
1854
1855
1856class OracleExecutionContext(default.DefaultExecutionContext):
1857 def fire_sequence(self, seq, type_):
1858 return self._execute_scalar(
1859 "SELECT "
1860 + self.identifier_preparer.format_sequence(seq)
1861 + ".nextval FROM DUAL",
1862 type_,
1863 )
1864
1865 def pre_exec(self):
1866 if self.statement and "_oracle_dblink" in self.execution_options:
1867 self.statement = self.statement.replace(
1868 dictionary.DB_LINK_PLACEHOLDER,
1869 self.execution_options["_oracle_dblink"],
1870 )
1871
1872
1873class OracleDialect(default.DefaultDialect):
1874 name = "oracle"
1875 supports_statement_cache = True
1876 supports_alter = True
1877 max_identifier_length = 128
1878
1879 _supports_offset_fetch = True
1880
1881 insert_returning = True
1882 update_returning = True
1883 delete_returning = True
1884
1885 div_is_floordiv = False
1886
1887 supports_simple_order_by_label = False
1888 cte_follows_insert = True
1889 returns_native_bytes = True
1890
1891 supports_sequences = True
1892 sequences_optional = False
1893 postfetch_lastrowid = False
1894
1895 default_paramstyle = "named"
1896 colspecs = colspecs
1897 ischema_names = ischema_names
1898 requires_name_normalize = True
1899
1900 supports_comments = True
1901
1902 supports_default_values = False
1903 supports_default_metavalue = True
1904 supports_empty_insert = False
1905 supports_identity_columns = True
1906
1907 statement_compiler = OracleCompiler
1908 ddl_compiler = OracleDDLCompiler
1909 type_compiler_cls = OracleTypeCompiler
1910 preparer = OracleIdentifierPreparer
1911 execution_ctx_cls = OracleExecutionContext
1912
1913 reflection_options = ("oracle_resolve_synonyms",)
1914
1915 _use_nchar_for_unicode = False
1916
1917 construct_arguments = [
1918 (
1919 sa_schema.Table,
1920 {
1921 "resolve_synonyms": False,
1922 "on_commit": None,
1923 "compress": False,
1924 "tablespace": None,
1925 },
1926 ),
1927 (
1928 sa_schema.Index,
1929 {
1930 "bitmap": False,
1931 "compress": False,
1932 "vector": False,
1933 },
1934 ),
1935 (sa_selectable.Select, {"fetch_approximate": False}),
1936 (sa_selectable.CompoundSelect, {"fetch_approximate": False}),
1937 ]
1938
1939 @util.deprecated_params(
1940 use_binds_for_limits=(
1941 "1.4",
1942 "The ``use_binds_for_limits`` Oracle Database dialect parameter "
1943 "is deprecated. The dialect now renders LIMIT / OFFSET integers "
1944 "inline in all cases using a post-compilation hook, so that the "
1945 "value is still represented by a 'bound parameter' on the Core "
1946 "Expression side.",
1947 )
1948 )
1949 def __init__(
1950 self,
1951 use_ansi=True,
1952 optimize_limits=False,
1953 use_binds_for_limits=None,
1954 use_nchar_for_unicode=False,
1955 exclude_tablespaces=("SYSTEM", "SYSAUX"),
1956 enable_offset_fetch=True,
1957 **kwargs,
1958 ):
1959 default.DefaultDialect.__init__(self, **kwargs)
1960 self._use_nchar_for_unicode = use_nchar_for_unicode
1961 self.use_ansi = use_ansi
1962 self.optimize_limits = optimize_limits
1963 self.exclude_tablespaces = exclude_tablespaces
1964 self.enable_offset_fetch = self._supports_offset_fetch = (
1965 enable_offset_fetch
1966 )
1967
1968 def initialize(self, connection):
1969 super().initialize(connection)
1970
1971 # Oracle 8i has RETURNING:
1972 # https://docs.oracle.com/cd/A87860_01/doc/index.htm
1973
1974 # so does Oracle8:
1975 # https://docs.oracle.com/cd/A64702_01/doc/index.htm
1976
1977 if self._is_oracle_8:
1978 self.colspecs = self.colspecs.copy()
1979 self.colspecs.pop(sqltypes.Interval)
1980 self.use_ansi = False
1981
1982 self.supports_identity_columns = self.server_version_info >= (12,)
1983 self._supports_offset_fetch = (
1984 self.enable_offset_fetch and self.server_version_info >= (12,)
1985 )
1986
1987 def _get_effective_compat_server_version_info(self, connection):
1988 # dialect does not need compat levels below 12.2, so don't query
1989 # in those cases
1990
1991 if self.server_version_info < (12, 2):
1992 return self.server_version_info
1993 try:
1994 compat = connection.exec_driver_sql(
1995 "SELECT value FROM v$parameter WHERE name = 'compatible'"
1996 ).scalar()
1997 except exc.DBAPIError:
1998 compat = None
1999
2000 if compat:
2001 try:
2002 return tuple(int(x) for x in compat.split("."))
2003 except:
2004 return self.server_version_info
2005 else:
2006 return self.server_version_info
2007
2008 @property
2009 def _is_oracle_8(self):
2010 return self.server_version_info and self.server_version_info < (9,)
2011
2012 @property
2013 def _supports_table_compression(self):
2014 return self.server_version_info and self.server_version_info >= (10, 1)
2015
2016 @property
2017 def _supports_table_compress_for(self):
2018 return self.server_version_info and self.server_version_info >= (11,)
2019
2020 @property
2021 def _supports_char_length(self):
2022 return not self._is_oracle_8
2023
2024 @property
2025 def _supports_update_returning_computed_cols(self):
2026 # on version 18 this error is no longet present while it happens on 11
2027 # it may work also on versions before the 18
2028 return self.server_version_info and self.server_version_info >= (18,)
2029
2030 @property
2031 def _supports_except_all(self):
2032 return self.server_version_info and self.server_version_info >= (21,)
2033
2034 def do_release_savepoint(self, connection, name):
2035 # Oracle does not support RELEASE SAVEPOINT
2036 pass
2037
2038 def _check_max_identifier_length(self, connection):
2039 if self._get_effective_compat_server_version_info(connection) < (
2040 12,
2041 2,
2042 ):
2043 return 30
2044 else:
2045 # use the default
2046 return None
2047
2048 def get_isolation_level_values(self, dbapi_connection):
2049 return ["READ COMMITTED", "SERIALIZABLE"]
2050
2051 def get_default_isolation_level(self, dbapi_conn):
2052 try:
2053 return self.get_isolation_level(dbapi_conn)
2054 except NotImplementedError:
2055 raise
2056 except:
2057 return "READ COMMITTED"
2058
2059 def _execute_reflection(
2060 self, connection, query, dblink, returns_long, params=None
2061 ):
2062 if dblink and not dblink.startswith("@"):
2063 dblink = f"@{dblink}"
2064 execution_options = {
2065 # handle db links
2066 "_oracle_dblink": dblink or "",
2067 # override any schema translate map
2068 "schema_translate_map": None,
2069 }
2070
2071 if dblink and returns_long:
2072 # Oracle seems to error with
2073 # "ORA-00997: illegal use of LONG datatype" when returning
2074 # LONG columns via a dblink in a query with bind params
2075 # This type seems to be very hard to cast into something else
2076 # so it seems easier to just use bind param in this case
2077 def visit_bindparam(bindparam):
2078 bindparam.literal_execute = True
2079
2080 query = visitors.cloned_traverse(
2081 query, {}, {"bindparam": visit_bindparam}
2082 )
2083 return connection.execute(
2084 query, params, execution_options=execution_options
2085 )
2086
2087 @util.memoized_property
2088 def _has_table_query(self):
2089 # materialized views are returned by all_tables
2090 tables = (
2091 select(
2092 dictionary.all_tables.c.table_name,
2093 dictionary.all_tables.c.owner,
2094 )
2095 .union_all(
2096 select(
2097 dictionary.all_views.c.view_name.label("table_name"),
2098 dictionary.all_views.c.owner,
2099 )
2100 )
2101 .subquery("tables_and_views")
2102 )
2103
2104 query = select(tables.c.table_name).where(
2105 tables.c.table_name == bindparam("table_name"),
2106 tables.c.owner == bindparam("owner"),
2107 )
2108 return query
2109
2110 @reflection.cache
2111 def has_table(
2112 self, connection, table_name, schema=None, dblink=None, **kw
2113 ):
2114 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2115 self._ensure_has_table_connection(connection)
2116
2117 if not schema:
2118 schema = self.default_schema_name
2119
2120 params = {
2121 "table_name": self.denormalize_name(table_name),
2122 "owner": self.denormalize_schema_name(schema),
2123 }
2124 cursor = self._execute_reflection(
2125 connection,
2126 self._has_table_query,
2127 dblink,
2128 returns_long=False,
2129 params=params,
2130 )
2131 return bool(cursor.scalar())
2132
2133 @reflection.cache
2134 def has_sequence(
2135 self, connection, sequence_name, schema=None, dblink=None, **kw
2136 ):
2137 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2138 if not schema:
2139 schema = self.default_schema_name
2140
2141 query = select(dictionary.all_sequences.c.sequence_name).where(
2142 dictionary.all_sequences.c.sequence_name
2143 == self.denormalize_schema_name(sequence_name),
2144 dictionary.all_sequences.c.sequence_owner
2145 == self.denormalize_schema_name(schema),
2146 )
2147
2148 cursor = self._execute_reflection(
2149 connection, query, dblink, returns_long=False
2150 )
2151 return bool(cursor.scalar())
2152
2153 def _get_default_schema_name(self, connection):
2154 return self.normalize_name(
2155 connection.exec_driver_sql(
2156 "select sys_context( 'userenv', 'current_schema' ) from dual"
2157 ).scalar()
2158 )
2159
2160 def denormalize_schema_name(self, name):
2161 # look for quoted_name
2162 force = getattr(name, "quote", None)
2163 if force is None and name == "public":
2164 # look for case insensitive, no quoting specified, "public"
2165 return "PUBLIC"
2166 return super().denormalize_name(name)
2167
2168 @reflection.flexi_cache(
2169 ("schema", InternalTraversal.dp_string),
2170 ("filter_names", InternalTraversal.dp_string_list),
2171 ("dblink", InternalTraversal.dp_string),
2172 )
2173 def _get_synonyms(self, connection, schema, filter_names, dblink, **kw):
2174 owner = self.denormalize_schema_name(
2175 schema or self.default_schema_name
2176 )
2177
2178 has_filter_names, params = self._prepare_filter_names(filter_names)
2179 query = select(
2180 dictionary.all_synonyms.c.synonym_name,
2181 dictionary.all_synonyms.c.table_name,
2182 dictionary.all_synonyms.c.table_owner,
2183 dictionary.all_synonyms.c.db_link,
2184 ).where(dictionary.all_synonyms.c.owner == owner)
2185 if has_filter_names:
2186 query = query.where(
2187 dictionary.all_synonyms.c.synonym_name.in_(
2188 params["filter_names"]
2189 )
2190 )
2191 result = self._execute_reflection(
2192 connection, query, dblink, returns_long=False
2193 ).mappings()
2194 return result.all()
2195
2196 @lru_cache()
2197 def _all_objects_query(
2198 self, owner, scope, kind, has_filter_names, has_mat_views
2199 ):
2200 query = (
2201 select(dictionary.all_objects.c.object_name)
2202 .select_from(dictionary.all_objects)
2203 .where(dictionary.all_objects.c.owner == owner)
2204 )
2205
2206 # NOTE: materialized views are listed in all_objects twice;
2207 # once as MATERIALIZE VIEW and once as TABLE
2208 if kind is ObjectKind.ANY:
2209 # materilaized view are listed also as tables so there is no
2210 # need to add them to the in_.
2211 query = query.where(
2212 dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW"))
2213 )
2214 else:
2215 object_type = []
2216 if ObjectKind.VIEW in kind:
2217 object_type.append("VIEW")
2218 if (
2219 ObjectKind.MATERIALIZED_VIEW in kind
2220 and ObjectKind.TABLE not in kind
2221 ):
2222 # materilaized view are listed also as tables so there is no
2223 # need to add them to the in_ if also selecting tables.
2224 object_type.append("MATERIALIZED VIEW")
2225 if ObjectKind.TABLE in kind:
2226 object_type.append("TABLE")
2227 if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind:
2228 # materialized view are listed also as tables,
2229 # so they need to be filtered out
2230 # EXCEPT ALL / MINUS profiles as faster than using
2231 # NOT EXISTS or NOT IN with a subquery, but it's in
2232 # general faster to get the mat view names and exclude
2233 # them only when needed
2234 query = query.where(
2235 dictionary.all_objects.c.object_name.not_in(
2236 bindparam("mat_views")
2237 )
2238 )
2239 query = query.where(
2240 dictionary.all_objects.c.object_type.in_(object_type)
2241 )
2242
2243 # handles scope
2244 if scope is ObjectScope.DEFAULT:
2245 query = query.where(dictionary.all_objects.c.temporary == "N")
2246 elif scope is ObjectScope.TEMPORARY:
2247 query = query.where(dictionary.all_objects.c.temporary == "Y")
2248
2249 if has_filter_names:
2250 query = query.where(
2251 dictionary.all_objects.c.object_name.in_(
2252 bindparam("filter_names")
2253 )
2254 )
2255 return query
2256
2257 @reflection.flexi_cache(
2258 ("schema", InternalTraversal.dp_string),
2259 ("scope", InternalTraversal.dp_plain_obj),
2260 ("kind", InternalTraversal.dp_plain_obj),
2261 ("filter_names", InternalTraversal.dp_string_list),
2262 ("dblink", InternalTraversal.dp_string),
2263 )
2264 def _get_all_objects(
2265 self, connection, schema, scope, kind, filter_names, dblink, **kw
2266 ):
2267 owner = self.denormalize_schema_name(
2268 schema or self.default_schema_name
2269 )
2270
2271 has_filter_names, params = self._prepare_filter_names(filter_names)
2272 has_mat_views = False
2273 if (
2274 ObjectKind.TABLE in kind
2275 and ObjectKind.MATERIALIZED_VIEW not in kind
2276 ):
2277 # see note in _all_objects_query
2278 mat_views = self.get_materialized_view_names(
2279 connection, schema, dblink, _normalize=False, **kw
2280 )
2281 if mat_views:
2282 params["mat_views"] = mat_views
2283 has_mat_views = True
2284
2285 query = self._all_objects_query(
2286 owner, scope, kind, has_filter_names, has_mat_views
2287 )
2288
2289 result = self._execute_reflection(
2290 connection, query, dblink, returns_long=False, params=params
2291 ).scalars()
2292
2293 return result.all()
2294
2295 def _handle_synonyms_decorator(fn):
2296 @wraps(fn)
2297 def wrapper(self, *args, **kwargs):
2298 return self._handle_synonyms(fn, *args, **kwargs)
2299
2300 return wrapper
2301
2302 def _handle_synonyms(self, fn, connection, *args, **kwargs):
2303 if not kwargs.get("oracle_resolve_synonyms", False):
2304 return fn(self, connection, *args, **kwargs)
2305
2306 original_kw = kwargs.copy()
2307 schema = kwargs.pop("schema", None)
2308 result = self._get_synonyms(
2309 connection,
2310 schema=schema,
2311 filter_names=kwargs.pop("filter_names", None),
2312 dblink=kwargs.pop("dblink", None),
2313 info_cache=kwargs.get("info_cache", None),
2314 )
2315
2316 dblinks_owners = defaultdict(dict)
2317 for row in result:
2318 key = row["db_link"], row["table_owner"]
2319 tn = self.normalize_name(row["table_name"])
2320 dblinks_owners[key][tn] = row["synonym_name"]
2321
2322 if not dblinks_owners:
2323 # No synonym, do the plain thing
2324 return fn(self, connection, *args, **original_kw)
2325
2326 data = {}
2327 for (dblink, table_owner), mapping in dblinks_owners.items():
2328 call_kw = {
2329 **original_kw,
2330 "schema": table_owner,
2331 "dblink": self.normalize_name(dblink),
2332 "filter_names": mapping.keys(),
2333 }
2334 call_result = fn(self, connection, *args, **call_kw)
2335 for (_, tn), value in call_result:
2336 synonym_name = self.normalize_name(mapping[tn])
2337 data[(schema, synonym_name)] = value
2338 return data.items()
2339
2340 @reflection.cache
2341 def get_schema_names(self, connection, dblink=None, **kw):
2342 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2343 query = select(dictionary.all_users.c.username).order_by(
2344 dictionary.all_users.c.username
2345 )
2346 result = self._execute_reflection(
2347 connection, query, dblink, returns_long=False
2348 ).scalars()
2349 return [self.normalize_name(row) for row in result]
2350
2351 @reflection.cache
2352 def get_table_names(self, connection, schema=None, dblink=None, **kw):
2353 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2354 # note that table_names() isn't loading DBLINKed or synonym'ed tables
2355 if schema is None:
2356 schema = self.default_schema_name
2357
2358 den_schema = self.denormalize_schema_name(schema)
2359 if kw.get("oracle_resolve_synonyms", False):
2360 tables = (
2361 select(
2362 dictionary.all_tables.c.table_name,
2363 dictionary.all_tables.c.owner,
2364 dictionary.all_tables.c.iot_name,
2365 dictionary.all_tables.c.duration,
2366 dictionary.all_tables.c.tablespace_name,
2367 )
2368 .union_all(
2369 select(
2370 dictionary.all_synonyms.c.synonym_name.label(
2371 "table_name"
2372 ),
2373 dictionary.all_synonyms.c.owner,
2374 dictionary.all_tables.c.iot_name,
2375 dictionary.all_tables.c.duration,
2376 dictionary.all_tables.c.tablespace_name,
2377 )
2378 .select_from(dictionary.all_tables)
2379 .join(
2380 dictionary.all_synonyms,
2381 and_(
2382 dictionary.all_tables.c.table_name
2383 == dictionary.all_synonyms.c.table_name,
2384 dictionary.all_tables.c.owner
2385 == func.coalesce(
2386 dictionary.all_synonyms.c.table_owner,
2387 dictionary.all_synonyms.c.owner,
2388 ),
2389 ),
2390 )
2391 )
2392 .subquery("available_tables")
2393 )
2394 else:
2395 tables = dictionary.all_tables
2396
2397 query = select(tables.c.table_name)
2398 if self.exclude_tablespaces:
2399 query = query.where(
2400 func.coalesce(
2401 tables.c.tablespace_name, "no tablespace"
2402 ).not_in(self.exclude_tablespaces)
2403 )
2404 query = query.where(
2405 tables.c.owner == den_schema,
2406 tables.c.iot_name.is_(null()),
2407 tables.c.duration.is_(null()),
2408 )
2409
2410 # remove materialized views
2411 mat_query = select(
2412 dictionary.all_mviews.c.mview_name.label("table_name")
2413 ).where(dictionary.all_mviews.c.owner == den_schema)
2414
2415 query = (
2416 query.except_all(mat_query)
2417 if self._supports_except_all
2418 else query.except_(mat_query)
2419 )
2420
2421 result = self._execute_reflection(
2422 connection, query, dblink, returns_long=False
2423 ).scalars()
2424 return [self.normalize_name(row) for row in result]
2425
2426 @reflection.cache
2427 def get_temp_table_names(self, connection, dblink=None, **kw):
2428 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2429 schema = self.denormalize_schema_name(self.default_schema_name)
2430
2431 query = select(dictionary.all_tables.c.table_name)
2432 if self.exclude_tablespaces:
2433 query = query.where(
2434 func.coalesce(
2435 dictionary.all_tables.c.tablespace_name, "no tablespace"
2436 ).not_in(self.exclude_tablespaces)
2437 )
2438 query = query.where(
2439 dictionary.all_tables.c.owner == schema,
2440 dictionary.all_tables.c.iot_name.is_(null()),
2441 dictionary.all_tables.c.duration.is_not(null()),
2442 )
2443
2444 result = self._execute_reflection(
2445 connection, query, dblink, returns_long=False
2446 ).scalars()
2447 return [self.normalize_name(row) for row in result]
2448
2449 @reflection.cache
2450 def get_materialized_view_names(
2451 self, connection, schema=None, dblink=None, _normalize=True, **kw
2452 ):
2453 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2454 if not schema:
2455 schema = self.default_schema_name
2456
2457 query = select(dictionary.all_mviews.c.mview_name).where(
2458 dictionary.all_mviews.c.owner
2459 == self.denormalize_schema_name(schema)
2460 )
2461 result = self._execute_reflection(
2462 connection, query, dblink, returns_long=False
2463 ).scalars()
2464 if _normalize:
2465 return [self.normalize_name(row) for row in result]
2466 else:
2467 return result.all()
2468
2469 @reflection.cache
2470 def get_view_names(self, connection, schema=None, dblink=None, **kw):
2471 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2472 if not schema:
2473 schema = self.default_schema_name
2474
2475 query = select(dictionary.all_views.c.view_name).where(
2476 dictionary.all_views.c.owner
2477 == self.denormalize_schema_name(schema)
2478 )
2479 result = self._execute_reflection(
2480 connection, query, dblink, returns_long=False
2481 ).scalars()
2482 return [self.normalize_name(row) for row in result]
2483
2484 @reflection.cache
2485 def get_sequence_names(self, connection, schema=None, dblink=None, **kw):
2486 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2487 if not schema:
2488 schema = self.default_schema_name
2489 query = select(dictionary.all_sequences.c.sequence_name).where(
2490 dictionary.all_sequences.c.sequence_owner
2491 == self.denormalize_schema_name(schema)
2492 )
2493
2494 result = self._execute_reflection(
2495 connection, query, dblink, returns_long=False
2496 ).scalars()
2497 return [self.normalize_name(row) for row in result]
2498
2499 def _value_or_raise(self, data, table, schema):
2500 table = self.normalize_name(str(table))
2501 try:
2502 return dict(data)[(schema, table)]
2503 except KeyError:
2504 raise exc.NoSuchTableError(
2505 f"{schema}.{table}" if schema else table
2506 ) from None
2507
2508 def _prepare_filter_names(self, filter_names):
2509 if filter_names:
2510 fn = [self.denormalize_name(name) for name in filter_names]
2511 return True, {"filter_names": fn}
2512 else:
2513 return False, {}
2514
2515 @reflection.cache
2516 def get_table_options(self, connection, table_name, schema=None, **kw):
2517 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2518 ``oracle_resolve_synonyms`` to resolve names to synonyms
2519 """
2520 data = self.get_multi_table_options(
2521 connection,
2522 schema=schema,
2523 filter_names=[table_name],
2524 scope=ObjectScope.ANY,
2525 kind=ObjectKind.ANY,
2526 **kw,
2527 )
2528 return self._value_or_raise(data, table_name, schema)
2529
2530 @lru_cache()
2531 def _table_options_query(
2532 self, owner, scope, kind, has_filter_names, has_mat_views
2533 ):
2534 query = select(
2535 dictionary.all_tables.c.table_name,
2536 (
2537 dictionary.all_tables.c.compression
2538 if self._supports_table_compression
2539 else sql.null().label("compression")
2540 ),
2541 (
2542 dictionary.all_tables.c.compress_for
2543 if self._supports_table_compress_for
2544 else sql.null().label("compress_for")
2545 ),
2546 dictionary.all_tables.c.tablespace_name,
2547 ).where(dictionary.all_tables.c.owner == owner)
2548 if has_filter_names:
2549 query = query.where(
2550 dictionary.all_tables.c.table_name.in_(
2551 bindparam("filter_names")
2552 )
2553 )
2554 if scope is ObjectScope.DEFAULT:
2555 query = query.where(dictionary.all_tables.c.duration.is_(null()))
2556 elif scope is ObjectScope.TEMPORARY:
2557 query = query.where(
2558 dictionary.all_tables.c.duration.is_not(null())
2559 )
2560
2561 if (
2562 has_mat_views
2563 and ObjectKind.TABLE in kind
2564 and ObjectKind.MATERIALIZED_VIEW not in kind
2565 ):
2566 # cant use EXCEPT ALL / MINUS here because we don't have an
2567 # excludable row vs. the query above
2568 # outerjoin + where null works better on oracle 21 but 11 does
2569 # not like it at all. this is the next best thing
2570
2571 query = query.where(
2572 dictionary.all_tables.c.table_name.not_in(
2573 bindparam("mat_views")
2574 )
2575 )
2576 elif (
2577 ObjectKind.TABLE not in kind
2578 and ObjectKind.MATERIALIZED_VIEW in kind
2579 ):
2580 query = query.where(
2581 dictionary.all_tables.c.table_name.in_(bindparam("mat_views"))
2582 )
2583 return query
2584
2585 @_handle_synonyms_decorator
2586 def get_multi_table_options(
2587 self,
2588 connection,
2589 *,
2590 schema,
2591 filter_names,
2592 scope,
2593 kind,
2594 dblink=None,
2595 **kw,
2596 ):
2597 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2598 ``oracle_resolve_synonyms`` to resolve names to synonyms
2599 """
2600 owner = self.denormalize_schema_name(
2601 schema or self.default_schema_name
2602 )
2603
2604 has_filter_names, params = self._prepare_filter_names(filter_names)
2605 has_mat_views = False
2606
2607 if (
2608 ObjectKind.TABLE in kind
2609 and ObjectKind.MATERIALIZED_VIEW not in kind
2610 ):
2611 # see note in _table_options_query
2612 mat_views = self.get_materialized_view_names(
2613 connection, schema, dblink, _normalize=False, **kw
2614 )
2615 if mat_views:
2616 params["mat_views"] = mat_views
2617 has_mat_views = True
2618 elif (
2619 ObjectKind.TABLE not in kind
2620 and ObjectKind.MATERIALIZED_VIEW in kind
2621 ):
2622 mat_views = self.get_materialized_view_names(
2623 connection, schema, dblink, _normalize=False, **kw
2624 )
2625 params["mat_views"] = mat_views
2626
2627 options = {}
2628 default = ReflectionDefaults.table_options
2629
2630 if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind:
2631 query = self._table_options_query(
2632 owner, scope, kind, has_filter_names, has_mat_views
2633 )
2634 result = self._execute_reflection(
2635 connection, query, dblink, returns_long=False, params=params
2636 )
2637
2638 for table, compression, compress_for, tablespace in result:
2639 data = default()
2640 if compression == "ENABLED":
2641 data["oracle_compress"] = compress_for
2642 if tablespace:
2643 data["oracle_tablespace"] = tablespace
2644 options[(schema, self.normalize_name(table))] = data
2645 if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope:
2646 # add the views (no temporary views)
2647 for view in self.get_view_names(connection, schema, dblink, **kw):
2648 if not filter_names or view in filter_names:
2649 options[(schema, view)] = default()
2650
2651 return options.items()
2652
2653 @reflection.cache
2654 def get_columns(self, connection, table_name, schema=None, **kw):
2655 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2656 ``oracle_resolve_synonyms`` to resolve names to synonyms
2657 """
2658
2659 data = self.get_multi_columns(
2660 connection,
2661 schema=schema,
2662 filter_names=[table_name],
2663 scope=ObjectScope.ANY,
2664 kind=ObjectKind.ANY,
2665 **kw,
2666 )
2667 return self._value_or_raise(data, table_name, schema)
2668
2669 def _run_batches(
2670 self, connection, query, dblink, returns_long, mappings, all_objects
2671 ):
2672 each_batch = 500
2673 batches = list(all_objects)
2674 while batches:
2675 batch = batches[0:each_batch]
2676 batches[0:each_batch] = []
2677
2678 result = self._execute_reflection(
2679 connection,
2680 query,
2681 dblink,
2682 returns_long=returns_long,
2683 params={"all_objects": batch},
2684 )
2685 if mappings:
2686 yield from result.mappings()
2687 else:
2688 yield from result
2689
2690 @lru_cache()
2691 def _column_query(self, owner):
2692 all_cols = dictionary.all_tab_cols
2693 all_comments = dictionary.all_col_comments
2694 all_ids = dictionary.all_tab_identity_cols
2695
2696 if self.server_version_info >= (12,):
2697 add_cols = (
2698 all_cols.c.default_on_null,
2699 sql.case(
2700 (all_ids.c.table_name.is_(None), sql.null()),
2701 else_=all_ids.c.generation_type
2702 + ","
2703 + all_ids.c.identity_options,
2704 ).label("identity_options"),
2705 )
2706 join_identity_cols = True
2707 else:
2708 add_cols = (
2709 sql.null().label("default_on_null"),
2710 sql.null().label("identity_options"),
2711 )
2712 join_identity_cols = False
2713
2714 # NOTE: on oracle cannot create tables/views without columns and
2715 # a table cannot have all column hidden:
2716 # ORA-54039: table must have at least one column that is not invisible
2717 # all_tab_cols returns data for tables/views/mat-views.
2718 # all_tab_cols does not return recycled tables
2719
2720 query = (
2721 select(
2722 all_cols.c.table_name,
2723 all_cols.c.column_name,
2724 all_cols.c.data_type,
2725 all_cols.c.char_length,
2726 all_cols.c.data_precision,
2727 all_cols.c.data_scale,
2728 all_cols.c.nullable,
2729 all_cols.c.data_default,
2730 all_comments.c.comments,
2731 all_cols.c.virtual_column,
2732 *add_cols,
2733 ).select_from(all_cols)
2734 # NOTE: all_col_comments has a row for each column even if no
2735 # comment is present, so a join could be performed, but there
2736 # seems to be no difference compared to an outer join
2737 .outerjoin(
2738 all_comments,
2739 and_(
2740 all_cols.c.table_name == all_comments.c.table_name,
2741 all_cols.c.column_name == all_comments.c.column_name,
2742 all_cols.c.owner == all_comments.c.owner,
2743 ),
2744 )
2745 )
2746 if join_identity_cols:
2747 query = query.outerjoin(
2748 all_ids,
2749 and_(
2750 all_cols.c.table_name == all_ids.c.table_name,
2751 all_cols.c.column_name == all_ids.c.column_name,
2752 all_cols.c.owner == all_ids.c.owner,
2753 ),
2754 )
2755
2756 query = query.where(
2757 all_cols.c.table_name.in_(bindparam("all_objects")),
2758 all_cols.c.hidden_column == "NO",
2759 all_cols.c.owner == owner,
2760 ).order_by(all_cols.c.table_name, all_cols.c.column_id)
2761 return query
2762
2763 @_handle_synonyms_decorator
2764 def get_multi_columns(
2765 self,
2766 connection,
2767 *,
2768 schema,
2769 filter_names,
2770 scope,
2771 kind,
2772 dblink=None,
2773 **kw,
2774 ):
2775 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2776 ``oracle_resolve_synonyms`` to resolve names to synonyms
2777 """
2778 owner = self.denormalize_schema_name(
2779 schema or self.default_schema_name
2780 )
2781 query = self._column_query(owner)
2782
2783 if (
2784 filter_names
2785 and kind is ObjectKind.ANY
2786 and scope is ObjectScope.ANY
2787 ):
2788 all_objects = [self.denormalize_name(n) for n in filter_names]
2789 else:
2790 all_objects = self._get_all_objects(
2791 connection, schema, scope, kind, filter_names, dblink, **kw
2792 )
2793
2794 columns = defaultdict(list)
2795
2796 # all_tab_cols.data_default is LONG
2797 result = self._run_batches(
2798 connection,
2799 query,
2800 dblink,
2801 returns_long=True,
2802 mappings=True,
2803 all_objects=all_objects,
2804 )
2805
2806 def maybe_int(value):
2807 if isinstance(value, float) and value.is_integer():
2808 return int(value)
2809 else:
2810 return value
2811
2812 remove_size = re.compile(r"\(\d+\)")
2813
2814 for row_dict in result:
2815 table_name = self.normalize_name(row_dict["table_name"])
2816 orig_colname = row_dict["column_name"]
2817 colname = self.normalize_name(orig_colname)
2818 coltype = row_dict["data_type"]
2819 precision = maybe_int(row_dict["data_precision"])
2820
2821 if coltype == "NUMBER":
2822 scale = maybe_int(row_dict["data_scale"])
2823 if precision is None and scale == 0:
2824 coltype = INTEGER()
2825 else:
2826 coltype = NUMBER(precision, scale)
2827 elif coltype == "FLOAT":
2828 # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm
2829 if precision == 126:
2830 # The DOUBLE PRECISION datatype is a floating-point
2831 # number with binary precision 126.
2832 coltype = DOUBLE_PRECISION()
2833 elif precision == 63:
2834 # The REAL datatype is a floating-point number with a
2835 # binary precision of 63, or 18 decimal.
2836 coltype = REAL()
2837 else:
2838 # non standard precision
2839 coltype = FLOAT(binary_precision=precision)
2840
2841 elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
2842 char_length = maybe_int(row_dict["char_length"])
2843 coltype = self.ischema_names.get(coltype)(char_length)
2844 elif "WITH TIME ZONE" in coltype:
2845 coltype = TIMESTAMP(timezone=True)
2846 elif "WITH LOCAL TIME ZONE" in coltype:
2847 coltype = TIMESTAMP(local_timezone=True)
2848 else:
2849 coltype = re.sub(remove_size, "", coltype)
2850 try:
2851 coltype = self.ischema_names[coltype]
2852 except KeyError:
2853 util.warn(
2854 "Did not recognize type '%s' of column '%s'"
2855 % (coltype, colname)
2856 )
2857 coltype = sqltypes.NULLTYPE
2858
2859 default = row_dict["data_default"]
2860 if row_dict["virtual_column"] == "YES":
2861 computed = dict(sqltext=default)
2862 default = None
2863 else:
2864 computed = None
2865
2866 identity_options = row_dict["identity_options"]
2867 if identity_options is not None:
2868 identity = self._parse_identity_options(
2869 identity_options, row_dict["default_on_null"]
2870 )
2871 default = None
2872 else:
2873 identity = None
2874
2875 cdict = {
2876 "name": colname,
2877 "type": coltype,
2878 "nullable": row_dict["nullable"] == "Y",
2879 "default": default,
2880 "comment": row_dict["comments"],
2881 }
2882 if orig_colname.lower() == orig_colname:
2883 cdict["quote"] = True
2884 if computed is not None:
2885 cdict["computed"] = computed
2886 if identity is not None:
2887 cdict["identity"] = identity
2888
2889 columns[(schema, table_name)].append(cdict)
2890
2891 # NOTE: default not needed since all tables have columns
2892 # default = ReflectionDefaults.columns
2893 # return (
2894 # (key, value if value else default())
2895 # for key, value in columns.items()
2896 # )
2897 return columns.items()
2898
2899 def _parse_identity_options(self, identity_options, default_on_null):
2900 # identity_options is a string that starts with 'ALWAYS,' or
2901 # 'BY DEFAULT,' and continues with
2902 # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
2903 # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
2904 # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
2905 parts = [p.strip() for p in identity_options.split(",")]
2906 identity = {
2907 "always": parts[0] == "ALWAYS",
2908 "on_null": default_on_null == "YES",
2909 }
2910
2911 for part in parts[1:]:
2912 option, value = part.split(":")
2913 value = value.strip()
2914
2915 if "START WITH" in option:
2916 identity["start"] = int(value)
2917 elif "INCREMENT BY" in option:
2918 identity["increment"] = int(value)
2919 elif "MAX_VALUE" in option:
2920 identity["maxvalue"] = int(value)
2921 elif "MIN_VALUE" in option:
2922 identity["minvalue"] = int(value)
2923 elif "CYCLE_FLAG" in option:
2924 identity["cycle"] = value == "Y"
2925 elif "CACHE_SIZE" in option:
2926 identity["cache"] = int(value)
2927 elif "ORDER_FLAG" in option:
2928 identity["order"] = value == "Y"
2929 return identity
2930
2931 @reflection.cache
2932 def get_table_comment(self, connection, table_name, schema=None, **kw):
2933 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2934 ``oracle_resolve_synonyms`` to resolve names to synonyms
2935 """
2936 data = self.get_multi_table_comment(
2937 connection,
2938 schema=schema,
2939 filter_names=[table_name],
2940 scope=ObjectScope.ANY,
2941 kind=ObjectKind.ANY,
2942 **kw,
2943 )
2944 return self._value_or_raise(data, table_name, schema)
2945
2946 @lru_cache()
2947 def _comment_query(self, owner, scope, kind, has_filter_names):
2948 # NOTE: all_tab_comments / all_mview_comments have a row for all
2949 # object even if they don't have comments
2950 queries = []
2951 if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind:
2952 # all_tab_comments returns also plain views
2953 tbl_view = select(
2954 dictionary.all_tab_comments.c.table_name,
2955 dictionary.all_tab_comments.c.comments,
2956 ).where(
2957 dictionary.all_tab_comments.c.owner == owner,
2958 dictionary.all_tab_comments.c.table_name.not_like("BIN$%"),
2959 )
2960 if ObjectKind.VIEW not in kind:
2961 tbl_view = tbl_view.where(
2962 dictionary.all_tab_comments.c.table_type == "TABLE"
2963 )
2964 elif ObjectKind.TABLE not in kind:
2965 tbl_view = tbl_view.where(
2966 dictionary.all_tab_comments.c.table_type == "VIEW"
2967 )
2968 queries.append(tbl_view)
2969 if ObjectKind.MATERIALIZED_VIEW in kind:
2970 mat_view = select(
2971 dictionary.all_mview_comments.c.mview_name.label("table_name"),
2972 dictionary.all_mview_comments.c.comments,
2973 ).where(
2974 dictionary.all_mview_comments.c.owner == owner,
2975 dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"),
2976 )
2977 queries.append(mat_view)
2978 if len(queries) == 1:
2979 query = queries[0]
2980 else:
2981 union = sql.union_all(*queries).subquery("tables_and_views")
2982 query = select(union.c.table_name, union.c.comments)
2983
2984 name_col = query.selected_columns.table_name
2985
2986 if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY):
2987 temp = "Y" if scope is ObjectScope.TEMPORARY else "N"
2988 # need distinct since materialized view are listed also
2989 # as tables in all_objects
2990 query = query.distinct().join(
2991 dictionary.all_objects,
2992 and_(
2993 dictionary.all_objects.c.owner == owner,
2994 dictionary.all_objects.c.object_name == name_col,
2995 dictionary.all_objects.c.temporary == temp,
2996 ),
2997 )
2998 if has_filter_names:
2999 query = query.where(name_col.in_(bindparam("filter_names")))
3000 return query
3001
3002 @_handle_synonyms_decorator
3003 def get_multi_table_comment(
3004 self,
3005 connection,
3006 *,
3007 schema,
3008 filter_names,
3009 scope,
3010 kind,
3011 dblink=None,
3012 **kw,
3013 ):
3014 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3015 ``oracle_resolve_synonyms`` to resolve names to synonyms
3016 """
3017 owner = self.denormalize_schema_name(
3018 schema or self.default_schema_name
3019 )
3020 has_filter_names, params = self._prepare_filter_names(filter_names)
3021 query = self._comment_query(owner, scope, kind, has_filter_names)
3022
3023 result = self._execute_reflection(
3024 connection, query, dblink, returns_long=False, params=params
3025 )
3026 default = ReflectionDefaults.table_comment
3027 # materialized views by default seem to have a comment like
3028 # "snapshot table for snapshot owner.mat_view_name"
3029 ignore_mat_view = "snapshot table for snapshot "
3030 return (
3031 (
3032 (schema, self.normalize_name(table)),
3033 (
3034 {"text": comment}
3035 if comment is not None
3036 and not comment.startswith(ignore_mat_view)
3037 else default()
3038 ),
3039 )
3040 for table, comment in result
3041 )
3042
3043 @reflection.cache
3044 def get_indexes(self, connection, table_name, schema=None, **kw):
3045 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3046 ``oracle_resolve_synonyms`` to resolve names to synonyms
3047 """
3048 data = self.get_multi_indexes(
3049 connection,
3050 schema=schema,
3051 filter_names=[table_name],
3052 scope=ObjectScope.ANY,
3053 kind=ObjectKind.ANY,
3054 **kw,
3055 )
3056 return self._value_or_raise(data, table_name, schema)
3057
3058 @lru_cache()
3059 def _index_query(self, owner):
3060 return (
3061 select(
3062 dictionary.all_ind_columns.c.table_name,
3063 dictionary.all_ind_columns.c.index_name,
3064 dictionary.all_ind_columns.c.column_name,
3065 dictionary.all_indexes.c.index_type,
3066 dictionary.all_indexes.c.uniqueness,
3067 dictionary.all_indexes.c.compression,
3068 dictionary.all_indexes.c.prefix_length,
3069 dictionary.all_ind_columns.c.descend,
3070 dictionary.all_ind_expressions.c.column_expression,
3071 )
3072 .select_from(dictionary.all_ind_columns)
3073 .join(
3074 dictionary.all_indexes,
3075 sql.and_(
3076 dictionary.all_ind_columns.c.index_name
3077 == dictionary.all_indexes.c.index_name,
3078 dictionary.all_ind_columns.c.index_owner
3079 == dictionary.all_indexes.c.owner,
3080 ),
3081 )
3082 .outerjoin(
3083 # NOTE: this adds about 20% to the query time. Using a
3084 # case expression with a scalar subquery only when needed
3085 # with the assumption that most indexes are not expression
3086 # would be faster but oracle does not like that with
3087 # LONG datatype. It errors with:
3088 # ORA-00997: illegal use of LONG datatype
3089 dictionary.all_ind_expressions,
3090 sql.and_(
3091 dictionary.all_ind_expressions.c.index_name
3092 == dictionary.all_ind_columns.c.index_name,
3093 dictionary.all_ind_expressions.c.index_owner
3094 == dictionary.all_ind_columns.c.index_owner,
3095 dictionary.all_ind_expressions.c.column_position
3096 == dictionary.all_ind_columns.c.column_position,
3097 ),
3098 )
3099 .where(
3100 dictionary.all_indexes.c.table_owner == owner,
3101 dictionary.all_indexes.c.table_name.in_(
3102 bindparam("all_objects")
3103 ),
3104 )
3105 .order_by(
3106 dictionary.all_ind_columns.c.index_name,
3107 dictionary.all_ind_columns.c.column_position,
3108 )
3109 )
3110
3111 @reflection.flexi_cache(
3112 ("schema", InternalTraversal.dp_string),
3113 ("dblink", InternalTraversal.dp_string),
3114 ("all_objects", InternalTraversal.dp_string_list),
3115 )
3116 def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw):
3117 owner = self.denormalize_schema_name(
3118 schema or self.default_schema_name
3119 )
3120
3121 query = self._index_query(owner)
3122
3123 pks = {
3124 row_dict["constraint_name"]
3125 for row_dict in self._get_all_constraint_rows(
3126 connection, schema, dblink, all_objects, **kw
3127 )
3128 if row_dict["constraint_type"] == "P"
3129 }
3130
3131 # all_ind_expressions.column_expression is LONG
3132 result = self._run_batches(
3133 connection,
3134 query,
3135 dblink,
3136 returns_long=True,
3137 mappings=True,
3138 all_objects=all_objects,
3139 )
3140
3141 return [
3142 row_dict
3143 for row_dict in result
3144 if row_dict["index_name"] not in pks
3145 ]
3146
3147 @_handle_synonyms_decorator
3148 def get_multi_indexes(
3149 self,
3150 connection,
3151 *,
3152 schema,
3153 filter_names,
3154 scope,
3155 kind,
3156 dblink=None,
3157 **kw,
3158 ):
3159 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3160 ``oracle_resolve_synonyms`` to resolve names to synonyms
3161 """
3162 all_objects = self._get_all_objects(
3163 connection, schema, scope, kind, filter_names, dblink, **kw
3164 )
3165
3166 uniqueness = {"NONUNIQUE": False, "UNIQUE": True}
3167 enabled = {"DISABLED": False, "ENABLED": True}
3168 is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"}
3169
3170 indexes = defaultdict(dict)
3171
3172 for row_dict in self._get_indexes_rows(
3173 connection, schema, dblink, all_objects, **kw
3174 ):
3175 index_name = self.normalize_name(row_dict["index_name"])
3176 table_name = self.normalize_name(row_dict["table_name"])
3177 table_indexes = indexes[(schema, table_name)]
3178
3179 if index_name not in table_indexes:
3180 table_indexes[index_name] = index_dict = {
3181 "name": index_name,
3182 "column_names": [],
3183 "dialect_options": {},
3184 "unique": uniqueness.get(row_dict["uniqueness"], False),
3185 }
3186 do = index_dict["dialect_options"]
3187 if row_dict["index_type"] in is_bitmap:
3188 do["oracle_bitmap"] = True
3189 if enabled.get(row_dict["compression"], False):
3190 do["oracle_compress"] = row_dict["prefix_length"]
3191
3192 else:
3193 index_dict = table_indexes[index_name]
3194
3195 expr = row_dict["column_expression"]
3196 if expr is not None:
3197 index_dict["column_names"].append(None)
3198 if "expressions" in index_dict:
3199 index_dict["expressions"].append(expr)
3200 else:
3201 index_dict["expressions"] = index_dict["column_names"][:-1]
3202 index_dict["expressions"].append(expr)
3203
3204 if row_dict["descend"].lower() != "asc":
3205 assert row_dict["descend"].lower() == "desc"
3206 cs = index_dict.setdefault("column_sorting", {})
3207 cs[expr] = ("desc",)
3208 else:
3209 assert row_dict["descend"].lower() == "asc"
3210 cn = self.normalize_name(row_dict["column_name"])
3211 index_dict["column_names"].append(cn)
3212 if "expressions" in index_dict:
3213 index_dict["expressions"].append(cn)
3214
3215 default = ReflectionDefaults.indexes
3216
3217 return (
3218 (key, list(indexes[key].values()) if key in indexes else default())
3219 for key in (
3220 (schema, self.normalize_name(obj_name))
3221 for obj_name in all_objects
3222 )
3223 )
3224
3225 @reflection.cache
3226 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
3227 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3228 ``oracle_resolve_synonyms`` to resolve names to synonyms
3229 """
3230 data = self.get_multi_pk_constraint(
3231 connection,
3232 schema=schema,
3233 filter_names=[table_name],
3234 scope=ObjectScope.ANY,
3235 kind=ObjectKind.ANY,
3236 **kw,
3237 )
3238 return self._value_or_raise(data, table_name, schema)
3239
3240 @lru_cache()
3241 def _constraint_query(self, owner):
3242 local = dictionary.all_cons_columns.alias("local")
3243 remote = dictionary.all_cons_columns.alias("remote")
3244 return (
3245 select(
3246 dictionary.all_constraints.c.table_name,
3247 dictionary.all_constraints.c.constraint_type,
3248 dictionary.all_constraints.c.constraint_name,
3249 local.c.column_name.label("local_column"),
3250 remote.c.table_name.label("remote_table"),
3251 remote.c.column_name.label("remote_column"),
3252 remote.c.owner.label("remote_owner"),
3253 dictionary.all_constraints.c.search_condition,
3254 dictionary.all_constraints.c.delete_rule,
3255 )
3256 .select_from(dictionary.all_constraints)
3257 .join(
3258 local,
3259 and_(
3260 local.c.owner == dictionary.all_constraints.c.owner,
3261 dictionary.all_constraints.c.constraint_name
3262 == local.c.constraint_name,
3263 ),
3264 )
3265 .outerjoin(
3266 remote,
3267 and_(
3268 dictionary.all_constraints.c.r_owner == remote.c.owner,
3269 dictionary.all_constraints.c.r_constraint_name
3270 == remote.c.constraint_name,
3271 or_(
3272 remote.c.position.is_(sql.null()),
3273 local.c.position == remote.c.position,
3274 ),
3275 ),
3276 )
3277 .where(
3278 dictionary.all_constraints.c.owner == owner,
3279 dictionary.all_constraints.c.table_name.in_(
3280 bindparam("all_objects")
3281 ),
3282 dictionary.all_constraints.c.constraint_type.in_(
3283 ("R", "P", "U", "C")
3284 ),
3285 )
3286 .order_by(
3287 dictionary.all_constraints.c.constraint_name, local.c.position
3288 )
3289 )
3290
3291 @reflection.flexi_cache(
3292 ("schema", InternalTraversal.dp_string),
3293 ("dblink", InternalTraversal.dp_string),
3294 ("all_objects", InternalTraversal.dp_string_list),
3295 )
3296 def _get_all_constraint_rows(
3297 self, connection, schema, dblink, all_objects, **kw
3298 ):
3299 owner = self.denormalize_schema_name(
3300 schema or self.default_schema_name
3301 )
3302 query = self._constraint_query(owner)
3303
3304 # since the result is cached a list must be created
3305 values = list(
3306 self._run_batches(
3307 connection,
3308 query,
3309 dblink,
3310 returns_long=False,
3311 mappings=True,
3312 all_objects=all_objects,
3313 )
3314 )
3315 return values
3316
3317 @_handle_synonyms_decorator
3318 def get_multi_pk_constraint(
3319 self,
3320 connection,
3321 *,
3322 scope,
3323 schema,
3324 filter_names,
3325 kind,
3326 dblink=None,
3327 **kw,
3328 ):
3329 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3330 ``oracle_resolve_synonyms`` to resolve names to synonyms
3331 """
3332 all_objects = self._get_all_objects(
3333 connection, schema, scope, kind, filter_names, dblink, **kw
3334 )
3335
3336 primary_keys = defaultdict(dict)
3337 default = ReflectionDefaults.pk_constraint
3338
3339 for row_dict in self._get_all_constraint_rows(
3340 connection, schema, dblink, all_objects, **kw
3341 ):
3342 if row_dict["constraint_type"] != "P":
3343 continue
3344 table_name = self.normalize_name(row_dict["table_name"])
3345 constraint_name = self.normalize_name(row_dict["constraint_name"])
3346 column_name = self.normalize_name(row_dict["local_column"])
3347
3348 table_pk = primary_keys[(schema, table_name)]
3349 if not table_pk:
3350 table_pk["name"] = constraint_name
3351 table_pk["constrained_columns"] = [column_name]
3352 else:
3353 table_pk["constrained_columns"].append(column_name)
3354
3355 return (
3356 (key, primary_keys[key] if key in primary_keys else default())
3357 for key in (
3358 (schema, self.normalize_name(obj_name))
3359 for obj_name in all_objects
3360 )
3361 )
3362
3363 @reflection.cache
3364 def get_foreign_keys(
3365 self,
3366 connection,
3367 table_name,
3368 schema=None,
3369 **kw,
3370 ):
3371 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3372 ``oracle_resolve_synonyms`` to resolve names to synonyms
3373 """
3374 data = self.get_multi_foreign_keys(
3375 connection,
3376 schema=schema,
3377 filter_names=[table_name],
3378 scope=ObjectScope.ANY,
3379 kind=ObjectKind.ANY,
3380 **kw,
3381 )
3382 return self._value_or_raise(data, table_name, schema)
3383
3384 @_handle_synonyms_decorator
3385 def get_multi_foreign_keys(
3386 self,
3387 connection,
3388 *,
3389 scope,
3390 schema,
3391 filter_names,
3392 kind,
3393 dblink=None,
3394 **kw,
3395 ):
3396 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3397 ``oracle_resolve_synonyms`` to resolve names to synonyms
3398 """
3399 all_objects = self._get_all_objects(
3400 connection, schema, scope, kind, filter_names, dblink, **kw
3401 )
3402
3403 resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
3404
3405 owner = self.denormalize_schema_name(
3406 schema or self.default_schema_name
3407 )
3408
3409 all_remote_owners = set()
3410 fkeys = defaultdict(dict)
3411
3412 for row_dict in self._get_all_constraint_rows(
3413 connection, schema, dblink, all_objects, **kw
3414 ):
3415 if row_dict["constraint_type"] != "R":
3416 continue
3417
3418 table_name = self.normalize_name(row_dict["table_name"])
3419 constraint_name = self.normalize_name(row_dict["constraint_name"])
3420 table_fkey = fkeys[(schema, table_name)]
3421
3422 assert constraint_name is not None
3423
3424 local_column = self.normalize_name(row_dict["local_column"])
3425 remote_table = self.normalize_name(row_dict["remote_table"])
3426 remote_column = self.normalize_name(row_dict["remote_column"])
3427 remote_owner_orig = row_dict["remote_owner"]
3428 remote_owner = self.normalize_name(remote_owner_orig)
3429 if remote_owner_orig is not None:
3430 all_remote_owners.add(remote_owner_orig)
3431
3432 if remote_table is None:
3433 # ticket 363
3434 if dblink and not dblink.startswith("@"):
3435 dblink = f"@{dblink}"
3436 util.warn(
3437 "Got 'None' querying 'table_name' from "
3438 f"all_cons_columns{dblink or ''} - does the user have "
3439 "proper rights to the table?"
3440 )
3441 continue
3442
3443 if constraint_name not in table_fkey:
3444 table_fkey[constraint_name] = fkey = {
3445 "name": constraint_name,
3446 "constrained_columns": [],
3447 "referred_schema": None,
3448 "referred_table": remote_table,
3449 "referred_columns": [],
3450 "options": {},
3451 }
3452
3453 if resolve_synonyms:
3454 # will be removed below
3455 fkey["_ref_schema"] = remote_owner
3456
3457 if schema is not None or remote_owner_orig != owner:
3458 fkey["referred_schema"] = remote_owner
3459
3460 delete_rule = row_dict["delete_rule"]
3461 if delete_rule != "NO ACTION":
3462 fkey["options"]["ondelete"] = delete_rule
3463
3464 else:
3465 fkey = table_fkey[constraint_name]
3466
3467 fkey["constrained_columns"].append(local_column)
3468 fkey["referred_columns"].append(remote_column)
3469
3470 if resolve_synonyms and all_remote_owners:
3471 query = select(
3472 dictionary.all_synonyms.c.owner,
3473 dictionary.all_synonyms.c.table_name,
3474 dictionary.all_synonyms.c.table_owner,
3475 dictionary.all_synonyms.c.synonym_name,
3476 ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners))
3477
3478 result = self._execute_reflection(
3479 connection, query, dblink, returns_long=False
3480 ).mappings()
3481
3482 remote_owners_lut = {}
3483 for row in result:
3484 synonym_owner = self.normalize_name(row["owner"])
3485 table_name = self.normalize_name(row["table_name"])
3486
3487 remote_owners_lut[(synonym_owner, table_name)] = (
3488 row["table_owner"],
3489 row["synonym_name"],
3490 )
3491
3492 empty = (None, None)
3493 for table_fkeys in fkeys.values():
3494 for table_fkey in table_fkeys.values():
3495 key = (
3496 table_fkey.pop("_ref_schema"),
3497 table_fkey["referred_table"],
3498 )
3499 remote_owner, syn_name = remote_owners_lut.get(key, empty)
3500 if syn_name:
3501 sn = self.normalize_name(syn_name)
3502 table_fkey["referred_table"] = sn
3503 if schema is not None or remote_owner != owner:
3504 ro = self.normalize_name(remote_owner)
3505 table_fkey["referred_schema"] = ro
3506 else:
3507 table_fkey["referred_schema"] = None
3508 default = ReflectionDefaults.foreign_keys
3509
3510 return (
3511 (key, list(fkeys[key].values()) if key in fkeys else default())
3512 for key in (
3513 (schema, self.normalize_name(obj_name))
3514 for obj_name in all_objects
3515 )
3516 )
3517
3518 @reflection.cache
3519 def get_unique_constraints(
3520 self, connection, table_name, schema=None, **kw
3521 ):
3522 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3523 ``oracle_resolve_synonyms`` to resolve names to synonyms
3524 """
3525 data = self.get_multi_unique_constraints(
3526 connection,
3527 schema=schema,
3528 filter_names=[table_name],
3529 scope=ObjectScope.ANY,
3530 kind=ObjectKind.ANY,
3531 **kw,
3532 )
3533 return self._value_or_raise(data, table_name, schema)
3534
3535 @_handle_synonyms_decorator
3536 def get_multi_unique_constraints(
3537 self,
3538 connection,
3539 *,
3540 scope,
3541 schema,
3542 filter_names,
3543 kind,
3544 dblink=None,
3545 **kw,
3546 ):
3547 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3548 ``oracle_resolve_synonyms`` to resolve names to synonyms
3549 """
3550 all_objects = self._get_all_objects(
3551 connection, schema, scope, kind, filter_names, dblink, **kw
3552 )
3553
3554 unique_cons = defaultdict(dict)
3555
3556 index_names = {
3557 row_dict["index_name"]
3558 for row_dict in self._get_indexes_rows(
3559 connection, schema, dblink, all_objects, **kw
3560 )
3561 }
3562
3563 for row_dict in self._get_all_constraint_rows(
3564 connection, schema, dblink, all_objects, **kw
3565 ):
3566 if row_dict["constraint_type"] != "U":
3567 continue
3568 table_name = self.normalize_name(row_dict["table_name"])
3569 constraint_name_orig = row_dict["constraint_name"]
3570 constraint_name = self.normalize_name(constraint_name_orig)
3571 column_name = self.normalize_name(row_dict["local_column"])
3572 table_uc = unique_cons[(schema, table_name)]
3573
3574 assert constraint_name is not None
3575
3576 if constraint_name not in table_uc:
3577 table_uc[constraint_name] = uc = {
3578 "name": constraint_name,
3579 "column_names": [],
3580 "duplicates_index": (
3581 constraint_name
3582 if constraint_name_orig in index_names
3583 else None
3584 ),
3585 }
3586 else:
3587 uc = table_uc[constraint_name]
3588
3589 uc["column_names"].append(column_name)
3590
3591 default = ReflectionDefaults.unique_constraints
3592
3593 return (
3594 (
3595 key,
3596 (
3597 list(unique_cons[key].values())
3598 if key in unique_cons
3599 else default()
3600 ),
3601 )
3602 for key in (
3603 (schema, self.normalize_name(obj_name))
3604 for obj_name in all_objects
3605 )
3606 )
3607
3608 @reflection.cache
3609 def get_view_definition(
3610 self,
3611 connection,
3612 view_name,
3613 schema=None,
3614 dblink=None,
3615 **kw,
3616 ):
3617 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3618 ``oracle_resolve_synonyms`` to resolve names to synonyms
3619 """
3620 if kw.get("oracle_resolve_synonyms", False):
3621 synonyms = self._get_synonyms(
3622 connection, schema, filter_names=[view_name], dblink=dblink
3623 )
3624 if synonyms:
3625 assert len(synonyms) == 1
3626 row_dict = synonyms[0]
3627 dblink = self.normalize_name(row_dict["db_link"])
3628 schema = row_dict["table_owner"]
3629 view_name = row_dict["table_name"]
3630
3631 name = self.denormalize_name(view_name)
3632 owner = self.denormalize_schema_name(
3633 schema or self.default_schema_name
3634 )
3635 query = (
3636 select(dictionary.all_views.c.text)
3637 .where(
3638 dictionary.all_views.c.view_name == name,
3639 dictionary.all_views.c.owner == owner,
3640 )
3641 .union_all(
3642 select(dictionary.all_mviews.c.query).where(
3643 dictionary.all_mviews.c.mview_name == name,
3644 dictionary.all_mviews.c.owner == owner,
3645 )
3646 )
3647 )
3648
3649 rp = self._execute_reflection(
3650 connection, query, dblink, returns_long=False
3651 ).scalar()
3652 if rp is None:
3653 raise exc.NoSuchTableError(
3654 f"{schema}.{view_name}" if schema else view_name
3655 )
3656 else:
3657 return rp
3658
3659 @reflection.cache
3660 def get_check_constraints(
3661 self, connection, table_name, schema=None, include_all=False, **kw
3662 ):
3663 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3664 ``oracle_resolve_synonyms`` to resolve names to synonyms
3665 """
3666 data = self.get_multi_check_constraints(
3667 connection,
3668 schema=schema,
3669 filter_names=[table_name],
3670 scope=ObjectScope.ANY,
3671 include_all=include_all,
3672 kind=ObjectKind.ANY,
3673 **kw,
3674 )
3675 return self._value_or_raise(data, table_name, schema)
3676
3677 @_handle_synonyms_decorator
3678 def get_multi_check_constraints(
3679 self,
3680 connection,
3681 *,
3682 schema,
3683 filter_names,
3684 dblink=None,
3685 scope,
3686 kind,
3687 include_all=False,
3688 **kw,
3689 ):
3690 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3691 ``oracle_resolve_synonyms`` to resolve names to synonyms
3692 """
3693 all_objects = self._get_all_objects(
3694 connection, schema, scope, kind, filter_names, dblink, **kw
3695 )
3696
3697 not_null = re.compile(r"..+?. IS NOT NULL$")
3698
3699 check_constraints = defaultdict(list)
3700
3701 for row_dict in self._get_all_constraint_rows(
3702 connection, schema, dblink, all_objects, **kw
3703 ):
3704 if row_dict["constraint_type"] != "C":
3705 continue
3706 table_name = self.normalize_name(row_dict["table_name"])
3707 constraint_name = self.normalize_name(row_dict["constraint_name"])
3708 search_condition = row_dict["search_condition"]
3709
3710 table_checks = check_constraints[(schema, table_name)]
3711 if constraint_name is not None and (
3712 include_all or not not_null.match(search_condition)
3713 ):
3714 table_checks.append(
3715 {"name": constraint_name, "sqltext": search_condition}
3716 )
3717
3718 default = ReflectionDefaults.check_constraints
3719
3720 return (
3721 (
3722 key,
3723 (
3724 check_constraints[key]
3725 if key in check_constraints
3726 else default()
3727 ),
3728 )
3729 for key in (
3730 (schema, self.normalize_name(obj_name))
3731 for obj_name in all_objects
3732 )
3733 )
3734
3735 def _list_dblinks(self, connection, dblink=None):
3736 query = select(dictionary.all_db_links.c.db_link)
3737 links = self._execute_reflection(
3738 connection, query, dblink, returns_long=False
3739 ).scalars()
3740 return [self.normalize_name(link) for link in links]
3741
3742
3743class _OuterJoinColumn(sql.ClauseElement):
3744 __visit_name__ = "outer_join_column"
3745
3746 def __init__(self, column):
3747 self.column = column