1# dialects/mysql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: mysql
12 :name: MySQL / MariaDB
13 :normal_support: 5.6+ / 10+
14 :best_effort: 5.0.2+ / 5.0.2+
15
16Supported Versions and Features
17-------------------------------
18
19SQLAlchemy supports MySQL starting with version 5.0.2 through modern releases,
20as well as all modern versions of MariaDB. See the official MySQL
21documentation for detailed information about features supported in any given
22server release.
23
24.. versionchanged:: 1.4 minimum MySQL version supported is now 5.0.2.
25
26MariaDB Support
27~~~~~~~~~~~~~~~
28
29The MariaDB variant of MySQL retains fundamental compatibility with MySQL's
30protocols however the development of these two products continues to diverge.
31Within the realm of SQLAlchemy, the two databases have a small number of
32syntactical and behavioral differences that SQLAlchemy accommodates automatically.
33To connect to a MariaDB database, no changes to the database URL are required::
34
35
36 engine = create_engine(
37 "mysql+pymysql://user:pass@some_mariadb/dbname?charset=utf8mb4"
38 )
39
40Upon first connect, the SQLAlchemy dialect employs a
41server version detection scheme that determines if the
42backing database reports as MariaDB. Based on this flag, the dialect
43can make different choices in those of areas where its behavior
44must be different.
45
46.. _mysql_mariadb_only_mode:
47
48MariaDB-Only Mode
49~~~~~~~~~~~~~~~~~
50
51The dialect also supports an **optional** "MariaDB-only" mode of connection, which may be
52useful for the case where an application makes use of MariaDB-specific features
53and is not compatible with a MySQL database. To use this mode of operation,
54replace the "mysql" token in the above URL with "mariadb"::
55
56 engine = create_engine(
57 "mariadb+pymysql://user:pass@some_mariadb/dbname?charset=utf8mb4"
58 )
59
60The above engine, upon first connect, will raise an error if the server version
61detection detects that the backing database is not MariaDB.
62
63When using an engine with ``"mariadb"`` as the dialect name, **all mysql-specific options
64that include the name "mysql" in them are now named with "mariadb"**. This means
65options like ``mysql_engine`` should be named ``mariadb_engine``, etc. Both
66"mysql" and "mariadb" options can be used simultaneously for applications that
67use URLs with both "mysql" and "mariadb" dialects::
68
69 my_table = Table(
70 "mytable",
71 metadata,
72 Column("id", Integer, primary_key=True),
73 Column("textdata", String(50)),
74 mariadb_engine="InnoDB",
75 mysql_engine="InnoDB",
76 )
77
78 Index(
79 "textdata_ix",
80 my_table.c.textdata,
81 mysql_prefix="FULLTEXT",
82 mariadb_prefix="FULLTEXT",
83 )
84
85Similar behavior will occur when the above structures are reflected, i.e. the
86"mariadb" prefix will be present in the option names when the database URL
87is based on the "mariadb" name.
88
89.. versionadded:: 1.4 Added "mariadb" dialect name supporting "MariaDB-only mode"
90 for the MySQL dialect.
91
92.. _mysql_connection_timeouts:
93
94Connection Timeouts and Disconnects
95-----------------------------------
96
97MySQL / MariaDB feature an automatic connection close behavior, for connections that
98have been idle for a fixed period of time, defaulting to eight hours.
99To circumvent having this issue, use
100the :paramref:`_sa.create_engine.pool_recycle` option which ensures that
101a connection will be discarded and replaced with a new one if it has been
102present in the pool for a fixed number of seconds::
103
104 engine = create_engine("mysql+mysqldb://...", pool_recycle=3600)
105
106For more comprehensive disconnect detection of pooled connections, including
107accommodation of server restarts and network issues, a pre-ping approach may
108be employed. See :ref:`pool_disconnects` for current approaches.
109
110.. seealso::
111
112 :ref:`pool_disconnects` - Background on several techniques for dealing
113 with timed out connections as well as database restarts.
114
115.. _mysql_storage_engines:
116
117CREATE TABLE arguments including Storage Engines
118------------------------------------------------
119
120Both MySQL's and MariaDB's CREATE TABLE syntax includes a wide array of special options,
121including ``ENGINE``, ``CHARSET``, ``MAX_ROWS``, ``ROW_FORMAT``,
122``INSERT_METHOD``, and many more.
123To accommodate the rendering of these arguments, specify the form
124``mysql_argument_name="value"``. For example, to specify a table with
125``ENGINE`` of ``InnoDB``, ``CHARSET`` of ``utf8mb4``, and ``KEY_BLOCK_SIZE``
126of ``1024``::
127
128 Table(
129 "mytable",
130 metadata,
131 Column("data", String(32)),
132 mysql_engine="InnoDB",
133 mysql_charset="utf8mb4",
134 mysql_key_block_size="1024",
135 )
136
137When supporting :ref:`mysql_mariadb_only_mode` mode, similar keys against
138the "mariadb" prefix must be included as well. The values can of course
139vary independently so that different settings on MySQL vs. MariaDB may
140be maintained::
141
142 # support both "mysql" and "mariadb-only" engine URLs
143
144 Table(
145 "mytable",
146 metadata,
147 Column("data", String(32)),
148 mysql_engine="InnoDB",
149 mariadb_engine="InnoDB",
150 mysql_charset="utf8mb4",
151 mariadb_charset="utf8",
152 mysql_key_block_size="1024",
153 mariadb_key_block_size="1024",
154 )
155
156The MySQL / MariaDB dialects will normally transfer any keyword specified as
157``mysql_keyword_name`` to be rendered as ``KEYWORD_NAME`` in the
158``CREATE TABLE`` statement. A handful of these names will render with a space
159instead of an underscore; to support this, the MySQL dialect has awareness of
160these particular names, which include ``DATA DIRECTORY``
161(e.g. ``mysql_data_directory``), ``CHARACTER SET`` (e.g.
162``mysql_character_set``) and ``INDEX DIRECTORY`` (e.g.
163``mysql_index_directory``).
164
165The most common argument is ``mysql_engine``, which refers to the storage
166engine for the table. Historically, MySQL server installations would default
167to ``MyISAM`` for this value, although newer versions may be defaulting
168to ``InnoDB``. The ``InnoDB`` engine is typically preferred for its support
169of transactions and foreign keys.
170
171A :class:`_schema.Table`
172that is created in a MySQL / MariaDB database with a storage engine
173of ``MyISAM`` will be essentially non-transactional, meaning any
174INSERT/UPDATE/DELETE statement referring to this table will be invoked as
175autocommit. It also will have no support for foreign key constraints; while
176the ``CREATE TABLE`` statement accepts foreign key options, when using the
177``MyISAM`` storage engine these arguments are discarded. Reflecting such a
178table will also produce no foreign key constraint information.
179
180For fully atomic transactions as well as support for foreign key
181constraints, all participating ``CREATE TABLE`` statements must specify a
182transactional engine, which in the vast majority of cases is ``InnoDB``.
183
184Partitioning can similarly be specified using similar options.
185In the example below the create table will specify ``PARTITION_BY``,
186``PARTITIONS``, ``SUBPARTITIONS`` and ``SUBPARTITION_BY``::
187
188 # can also use mariadb_* prefix
189 Table(
190 "testtable",
191 MetaData(),
192 Column("id", Integer(), primary_key=True, autoincrement=True),
193 Column("other_id", Integer(), primary_key=True, autoincrement=False),
194 mysql_partitions="2",
195 mysql_partition_by="KEY(other_id)",
196 mysql_subpartition_by="HASH(some_expr)",
197 mysql_subpartitions="2",
198 )
199
200This will render:
201
202.. sourcecode:: sql
203
204 CREATE TABLE testtable (
205 id INTEGER NOT NULL AUTO_INCREMENT,
206 other_id INTEGER NOT NULL,
207 PRIMARY KEY (id, other_id)
208 )PARTITION BY KEY(other_id) PARTITIONS 2 SUBPARTITION BY HASH(some_expr) SUBPARTITIONS 2
209
210Case Sensitivity and Table Reflection
211-------------------------------------
212
213Both MySQL and MariaDB have inconsistent support for case-sensitive identifier
214names, basing support on specific details of the underlying
215operating system. However, it has been observed that no matter
216what case sensitivity behavior is present, the names of tables in
217foreign key declarations are *always* received from the database
218as all-lower case, making it impossible to accurately reflect a
219schema where inter-related tables use mixed-case identifier names.
220
221Therefore it is strongly advised that table names be declared as
222all lower case both within SQLAlchemy as well as on the MySQL / MariaDB
223database itself, especially if database reflection features are
224to be used.
225
226.. _mysql_isolation_level:
227
228Transaction Isolation Level
229---------------------------
230
231All MySQL / MariaDB dialects support setting of transaction isolation level both via a
232dialect-specific parameter :paramref:`_sa.create_engine.isolation_level`
233accepted
234by :func:`_sa.create_engine`, as well as the
235:paramref:`.Connection.execution_options.isolation_level` argument as passed to
236:meth:`_engine.Connection.execution_options`.
237This feature works by issuing the
238command ``SET SESSION TRANSACTION ISOLATION LEVEL <level>`` for each new
239connection. For the special AUTOCOMMIT isolation level, DBAPI-specific
240techniques are used.
241
242To set isolation level using :func:`_sa.create_engine`::
243
244 engine = create_engine(
245 "mysql+mysqldb://scott:tiger@localhost/test",
246 isolation_level="READ UNCOMMITTED",
247 )
248
249To set using per-connection execution options::
250
251 connection = engine.connect()
252 connection = connection.execution_options(isolation_level="READ COMMITTED")
253
254Valid values for ``isolation_level`` include:
255
256* ``READ COMMITTED``
257* ``READ UNCOMMITTED``
258* ``REPEATABLE READ``
259* ``SERIALIZABLE``
260* ``AUTOCOMMIT``
261
262The special ``AUTOCOMMIT`` value makes use of the various "autocommit"
263attributes provided by specific DBAPIs, and is currently supported by
264MySQLdb, MySQL-Client, MySQL-Connector Python, and PyMySQL. Using it,
265the database connection will return true for the value of
266``SELECT @@autocommit;``.
267
268There are also more options for isolation level configurations, such as
269"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
270different isolation level settings. See the discussion at
271:ref:`dbapi_autocommit` for background.
272
273.. seealso::
274
275 :ref:`dbapi_autocommit`
276
277AUTO_INCREMENT Behavior
278-----------------------
279
280When creating tables, SQLAlchemy will automatically set ``AUTO_INCREMENT`` on
281the first :class:`.Integer` primary key column which is not marked as a
282foreign key::
283
284 >>> t = Table(
285 ... "mytable", metadata, Column("mytable_id", Integer, primary_key=True)
286 ... )
287 >>> t.create()
288 CREATE TABLE mytable (
289 id INTEGER NOT NULL AUTO_INCREMENT,
290 PRIMARY KEY (id)
291 )
292
293You can disable this behavior by passing ``False`` to the
294:paramref:`_schema.Column.autoincrement` argument of :class:`_schema.Column`.
295This flag
296can also be used to enable auto-increment on a secondary column in a
297multi-column key for some storage engines::
298
299 Table(
300 "mytable",
301 metadata,
302 Column("gid", Integer, primary_key=True, autoincrement=False),
303 Column("id", Integer, primary_key=True),
304 )
305
306.. _mysql_ss_cursors:
307
308Server Side Cursors
309-------------------
310
311Server-side cursor support is available for the mysqlclient, PyMySQL,
312mariadbconnector dialects and may also be available in others. This makes use
313of either the "buffered=True/False" flag if available or by using a class such
314as ``MySQLdb.cursors.SSCursor`` or ``pymysql.cursors.SSCursor`` internally.
315
316
317Server side cursors are enabled on a per-statement basis by using the
318:paramref:`.Connection.execution_options.stream_results` connection execution
319option::
320
321 with engine.connect() as conn:
322 result = conn.execution_options(stream_results=True).execute(
323 text("select * from table")
324 )
325
326Note that some kinds of SQL statements may not be supported with
327server side cursors; generally, only SQL statements that return rows should be
328used with this option.
329
330.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
331 and will be removed in a future release. Please use the
332 :paramref:`_engine.Connection.stream_results` execution option for
333 unbuffered cursor support.
334
335.. seealso::
336
337 :ref:`engine_stream_results`
338
339.. _mysql_unicode:
340
341Unicode
342-------
343
344Charset Selection
345~~~~~~~~~~~~~~~~~
346
347Most MySQL / MariaDB DBAPIs offer the option to set the client character set for
348a connection. This is typically delivered using the ``charset`` parameter
349in the URL, such as::
350
351 e = create_engine(
352 "mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4"
353 )
354
355This charset is the **client character set** for the connection. Some
356MySQL DBAPIs will default this to a value such as ``latin1``, and some
357will make use of the ``default-character-set`` setting in the ``my.cnf``
358file as well. Documentation for the DBAPI in use should be consulted
359for specific behavior.
360
361The encoding used for Unicode has traditionally been ``'utf8'``. However, for
362MySQL versions 5.5.3 and MariaDB 5.5 on forward, a new MySQL-specific encoding
363``'utf8mb4'`` has been introduced, and as of MySQL 8.0 a warning is emitted by
364the server if plain ``utf8`` is specified within any server-side directives,
365replaced with ``utf8mb3``. The rationale for this new encoding is due to the
366fact that MySQL's legacy utf-8 encoding only supports codepoints up to three
367bytes instead of four. Therefore, when communicating with a MySQL or MariaDB
368database that includes codepoints more than three bytes in size, this new
369charset is preferred, if supported by both the database as well as the client
370DBAPI, as in::
371
372 e = create_engine(
373 "mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4"
374 )
375
376All modern DBAPIs should support the ``utf8mb4`` charset.
377
378In order to use ``utf8mb4`` encoding for a schema that was created with legacy
379``utf8``, changes to the MySQL/MariaDB schema and/or server configuration may be
380required.
381
382.. seealso::
383
384 `The utf8mb4 Character Set \
385 <https://dev.mysql.com/doc/refman/5.5/en/charset-unicode-utf8mb4.html>`_ - \
386 in the MySQL documentation
387
388.. _mysql_binary_introducer:
389
390Dealing with Binary Data Warnings and Unicode
391~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
392
393MySQL versions 5.6, 5.7 and later (not MariaDB at the time of this writing) now
394emit a warning when attempting to pass binary data to the database, while a
395character set encoding is also in place, when the binary data itself is not
396valid for that encoding:
397
398.. sourcecode:: text
399
400 default.py:509: Warning: (1300, "Invalid utf8mb4 character string:
401 'F9876A'")
402 cursor.execute(statement, parameters)
403
404This warning is due to the fact that the MySQL client library is attempting to
405interpret the binary string as a unicode object even if a datatype such
406as :class:`.LargeBinary` is in use. To resolve this, the SQL statement requires
407a binary "character set introducer" be present before any non-NULL value
408that renders like this:
409
410.. sourcecode:: sql
411
412 INSERT INTO table (data) VALUES (_binary %s)
413
414These character set introducers are provided by the DBAPI driver, assuming the
415use of mysqlclient or PyMySQL (both of which are recommended). Add the query
416string parameter ``binary_prefix=true`` to the URL to repair this warning::
417
418 # mysqlclient
419 engine = create_engine(
420 "mysql+mysqldb://scott:tiger@localhost/test?charset=utf8mb4&binary_prefix=true"
421 )
422
423 # PyMySQL
424 engine = create_engine(
425 "mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4&binary_prefix=true"
426 )
427
428The ``binary_prefix`` flag may or may not be supported by other MySQL drivers.
429
430SQLAlchemy itself cannot render this ``_binary`` prefix reliably, as it does
431not work with the NULL value, which is valid to be sent as a bound parameter.
432As the MySQL driver renders parameters directly into the SQL string, it's the
433most efficient place for this additional keyword to be passed.
434
435.. seealso::
436
437 `Character set introducers <https://dev.mysql.com/doc/refman/5.7/en/charset-introducer.html>`_ - on the MySQL website
438
439
440ANSI Quoting Style
441------------------
442
443MySQL / MariaDB feature two varieties of identifier "quoting style", one using
444backticks and the other using quotes, e.g. ```some_identifier``` vs.
445``"some_identifier"``. All MySQL dialects detect which version
446is in use by checking the value of :ref:`sql_mode<mysql_sql_mode>` when a connection is first
447established with a particular :class:`_engine.Engine`.
448This quoting style comes
449into play when rendering table and column names as well as when reflecting
450existing database structures. The detection is entirely automatic and
451no special configuration is needed to use either quoting style.
452
453
454.. _mysql_sql_mode:
455
456Changing the sql_mode
457---------------------
458
459MySQL supports operating in multiple
460`Server SQL Modes <https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html>`_ for
461both Servers and Clients. To change the ``sql_mode`` for a given application, a
462developer can leverage SQLAlchemy's Events system.
463
464In the following example, the event system is used to set the ``sql_mode`` on
465the ``first_connect`` and ``connect`` events::
466
467 from sqlalchemy import create_engine, event
468
469 eng = create_engine(
470 "mysql+mysqldb://scott:tiger@localhost/test", echo="debug"
471 )
472
473
474 # `insert=True` will ensure this is the very first listener to run
475 @event.listens_for(eng, "connect", insert=True)
476 def connect(dbapi_connection, connection_record):
477 cursor = dbapi_connection.cursor()
478 cursor.execute("SET sql_mode = 'STRICT_ALL_TABLES'")
479
480
481 conn = eng.connect()
482
483In the example illustrated above, the "connect" event will invoke the "SET"
484statement on the connection at the moment a particular DBAPI connection is
485first created for a given Pool, before the connection is made available to the
486connection pool. Additionally, because the function was registered with
487``insert=True``, it will be prepended to the internal list of registered
488functions.
489
490
491MySQL / MariaDB SQL Extensions
492------------------------------
493
494Many of the MySQL / MariaDB SQL extensions are handled through SQLAlchemy's generic
495function and operator support::
496
497 table.select(table.c.password == func.md5("plaintext"))
498 table.select(table.c.username.op("regexp")("^[a-d]"))
499
500And of course any valid SQL statement can be executed as a string as well.
501
502Some limited direct support for MySQL / MariaDB extensions to SQL is currently
503available.
504
505* INSERT..ON DUPLICATE KEY UPDATE: See
506 :ref:`mysql_insert_on_duplicate_key_update`
507
508* SELECT pragma, use :meth:`_expression.Select.prefix_with` and
509 :meth:`_query.Query.prefix_with`::
510
511 select(...).prefix_with(["HIGH_PRIORITY", "SQL_SMALL_RESULT"])
512
513* UPDATE with LIMIT::
514
515 update(...).with_dialect_options(mysql_limit=10, mariadb_limit=10)
516
517* DELETE
518 with LIMIT::
519
520 delete(...).with_dialect_options(mysql_limit=10, mariadb_limit=10)
521
522 .. versionadded:: 2.0.37 Added delete with limit
523
524* optimizer hints, use :meth:`_expression.Select.prefix_with` and
525 :meth:`_query.Query.prefix_with`::
526
527 select(...).prefix_with("/*+ NO_RANGE_OPTIMIZATION(t4 PRIMARY) */")
528
529* index hints, use :meth:`_expression.Select.with_hint` and
530 :meth:`_query.Query.with_hint`::
531
532 select(...).with_hint(some_table, "USE INDEX xyz")
533
534* MATCH
535 operator support::
536
537 from sqlalchemy.dialects.mysql import match
538
539 select(...).where(match(col1, col2, against="some expr").in_boolean_mode())
540
541 .. seealso::
542
543 :class:`_mysql.match`
544
545INSERT/DELETE...RETURNING
546-------------------------
547
548The MariaDB dialect supports 10.5+'s ``INSERT..RETURNING`` and
549``DELETE..RETURNING`` (10.0+) syntaxes. ``INSERT..RETURNING`` may be used
550automatically in some cases in order to fetch newly generated identifiers in
551place of the traditional approach of using ``cursor.lastrowid``, however
552``cursor.lastrowid`` is currently still preferred for simple single-statement
553cases for its better performance.
554
555To specify an explicit ``RETURNING`` clause, use the
556:meth:`._UpdateBase.returning` method on a per-statement basis::
557
558 # INSERT..RETURNING
559 result = connection.execute(
560 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
561 )
562 print(result.all())
563
564 # DELETE..RETURNING
565 result = connection.execute(
566 table.delete()
567 .where(table.c.name == "foo")
568 .returning(table.c.col1, table.c.col2)
569 )
570 print(result.all())
571
572.. versionadded:: 2.0 Added support for MariaDB RETURNING
573
574.. _mysql_insert_on_duplicate_key_update:
575
576INSERT...ON DUPLICATE KEY UPDATE (Upsert)
577------------------------------------------
578
579MySQL / MariaDB allow "upserts" (update or insert)
580of rows into a table via the ``ON DUPLICATE KEY UPDATE`` clause of the
581``INSERT`` statement. A candidate row will only be inserted if that row does
582not match an existing primary or unique key in the table; otherwise, an UPDATE
583will be performed. The statement allows for separate specification of the
584values to INSERT versus the values for UPDATE.
585
586SQLAlchemy provides ``ON DUPLICATE KEY UPDATE`` support via the MySQL-specific
587:func:`.mysql.insert()` function, which provides
588the generative method :meth:`~.mysql.Insert.on_duplicate_key_update`:
589
590.. sourcecode:: pycon+sql
591
592 >>> from sqlalchemy.dialects.mysql import insert
593
594 >>> insert_stmt = insert(my_table).values(
595 ... id="some_existing_id", data="inserted value"
596 ... )
597
598 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
599 ... data=insert_stmt.inserted.data, status="U"
600 ... )
601 >>> print(on_duplicate_key_stmt)
602 {printsql}INSERT INTO my_table (id, data) VALUES (%s, %s)
603 ON DUPLICATE KEY UPDATE data = VALUES(data), status = %s
604
605
606Unlike PostgreSQL's "ON CONFLICT" phrase, the "ON DUPLICATE KEY UPDATE"
607phrase will always match on any primary key or unique key, and will always
608perform an UPDATE if there's a match; there are no options for it to raise
609an error or to skip performing an UPDATE.
610
611``ON DUPLICATE KEY UPDATE`` is used to perform an update of the already
612existing row, using any combination of new values as well as values
613from the proposed insertion. These values are normally specified using
614keyword arguments passed to the
615:meth:`_mysql.Insert.on_duplicate_key_update`
616given column key values (usually the name of the column, unless it
617specifies :paramref:`_schema.Column.key`
618) as keys and literal or SQL expressions
619as values:
620
621.. sourcecode:: pycon+sql
622
623 >>> insert_stmt = insert(my_table).values(
624 ... id="some_existing_id", data="inserted value"
625 ... )
626
627 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
628 ... data="some data",
629 ... updated_at=func.current_timestamp(),
630 ... )
631
632 >>> print(on_duplicate_key_stmt)
633 {printsql}INSERT INTO my_table (id, data) VALUES (%s, %s)
634 ON DUPLICATE KEY UPDATE data = %s, updated_at = CURRENT_TIMESTAMP
635
636In a manner similar to that of :meth:`.UpdateBase.values`, other parameter
637forms are accepted, including a single dictionary:
638
639.. sourcecode:: pycon+sql
640
641 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
642 ... {"data": "some data", "updated_at": func.current_timestamp()},
643 ... )
644
645as well as a list of 2-tuples, which will automatically provide
646a parameter-ordered UPDATE statement in a manner similar to that described
647at :ref:`tutorial_parameter_ordered_updates`. Unlike the :class:`_expression.Update`
648object,
649no special flag is needed to specify the intent since the argument form is
650this context is unambiguous:
651
652.. sourcecode:: pycon+sql
653
654 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
655 ... [
656 ... ("data", "some data"),
657 ... ("updated_at", func.current_timestamp()),
658 ... ]
659 ... )
660
661 >>> print(on_duplicate_key_stmt)
662 {printsql}INSERT INTO my_table (id, data) VALUES (%s, %s)
663 ON DUPLICATE KEY UPDATE data = %s, updated_at = CURRENT_TIMESTAMP
664
665.. versionchanged:: 1.3 support for parameter-ordered UPDATE clause within
666 MySQL ON DUPLICATE KEY UPDATE
667
668.. warning::
669
670 The :meth:`_mysql.Insert.on_duplicate_key_update`
671 method does **not** take into
672 account Python-side default UPDATE values or generation functions, e.g.
673 e.g. those specified using :paramref:`_schema.Column.onupdate`.
674 These values will not be exercised for an ON DUPLICATE KEY style of UPDATE,
675 unless they are manually specified explicitly in the parameters.
676
677
678
679In order to refer to the proposed insertion row, the special alias
680:attr:`_mysql.Insert.inserted` is available as an attribute on
681the :class:`_mysql.Insert` object; this object is a
682:class:`_expression.ColumnCollection` which contains all columns of the target
683table:
684
685.. sourcecode:: pycon+sql
686
687 >>> stmt = insert(my_table).values(
688 ... id="some_id", data="inserted value", author="jlh"
689 ... )
690
691 >>> do_update_stmt = stmt.on_duplicate_key_update(
692 ... data="updated value", author=stmt.inserted.author
693 ... )
694
695 >>> print(do_update_stmt)
696 {printsql}INSERT INTO my_table (id, data, author) VALUES (%s, %s, %s)
697 ON DUPLICATE KEY UPDATE data = %s, author = VALUES(author)
698
699When rendered, the "inserted" namespace will produce the expression
700``VALUES(<columnname>)``.
701
702.. versionadded:: 1.2 Added support for MySQL ON DUPLICATE KEY UPDATE clause
703
704
705
706rowcount Support
707----------------
708
709SQLAlchemy standardizes the DBAPI ``cursor.rowcount`` attribute to be the
710usual definition of "number of rows matched by an UPDATE or DELETE" statement.
711This is in contradiction to the default setting on most MySQL DBAPI drivers,
712which is "number of rows actually modified/deleted". For this reason, the
713SQLAlchemy MySQL dialects always add the ``constants.CLIENT.FOUND_ROWS``
714flag, or whatever is equivalent for the target dialect, upon connection.
715This setting is currently hardcoded.
716
717.. seealso::
718
719 :attr:`_engine.CursorResult.rowcount`
720
721
722.. _mysql_indexes:
723
724MySQL / MariaDB- Specific Index Options
725-----------------------------------------
726
727MySQL and MariaDB-specific extensions to the :class:`.Index` construct are available.
728
729Index Length
730~~~~~~~~~~~~~
731
732MySQL and MariaDB both provide an option to create index entries with a certain length, where
733"length" refers to the number of characters or bytes in each value which will
734become part of the index. SQLAlchemy provides this feature via the
735``mysql_length`` and/or ``mariadb_length`` parameters::
736
737 Index("my_index", my_table.c.data, mysql_length=10, mariadb_length=10)
738
739 Index("a_b_idx", my_table.c.a, my_table.c.b, mysql_length={"a": 4, "b": 9})
740
741 Index(
742 "a_b_idx", my_table.c.a, my_table.c.b, mariadb_length={"a": 4, "b": 9}
743 )
744
745Prefix lengths are given in characters for nonbinary string types and in bytes
746for binary string types. The value passed to the keyword argument *must* be
747either an integer (and, thus, specify the same prefix length value for all
748columns of the index) or a dict in which keys are column names and values are
749prefix length values for corresponding columns. MySQL and MariaDB only allow a
750length for a column of an index if it is for a CHAR, VARCHAR, TEXT, BINARY,
751VARBINARY and BLOB.
752
753Index Prefixes
754~~~~~~~~~~~~~~
755
756MySQL storage engines permit you to specify an index prefix when creating
757an index. SQLAlchemy provides this feature via the
758``mysql_prefix`` parameter on :class:`.Index`::
759
760 Index("my_index", my_table.c.data, mysql_prefix="FULLTEXT")
761
762The value passed to the keyword argument will be simply passed through to the
763underlying CREATE INDEX, so it *must* be a valid index prefix for your MySQL
764storage engine.
765
766.. seealso::
767
768 `CREATE INDEX <https://dev.mysql.com/doc/refman/5.0/en/create-index.html>`_ - MySQL documentation
769
770Index Types
771~~~~~~~~~~~~~
772
773Some MySQL storage engines permit you to specify an index type when creating
774an index or primary key constraint. SQLAlchemy provides this feature via the
775``mysql_using`` parameter on :class:`.Index`::
776
777 Index(
778 "my_index", my_table.c.data, mysql_using="hash", mariadb_using="hash"
779 )
780
781As well as the ``mysql_using`` parameter on :class:`.PrimaryKeyConstraint`::
782
783 PrimaryKeyConstraint("data", mysql_using="hash", mariadb_using="hash")
784
785The value passed to the keyword argument will be simply passed through to the
786underlying CREATE INDEX or PRIMARY KEY clause, so it *must* be a valid index
787type for your MySQL storage engine.
788
789More information can be found at:
790
791https://dev.mysql.com/doc/refman/5.0/en/create-index.html
792
793https://dev.mysql.com/doc/refman/5.0/en/create-table.html
794
795Index Parsers
796~~~~~~~~~~~~~
797
798CREATE FULLTEXT INDEX in MySQL also supports a "WITH PARSER" option. This
799is available using the keyword argument ``mysql_with_parser``::
800
801 Index(
802 "my_index",
803 my_table.c.data,
804 mysql_prefix="FULLTEXT",
805 mysql_with_parser="ngram",
806 mariadb_prefix="FULLTEXT",
807 mariadb_with_parser="ngram",
808 )
809
810.. versionadded:: 1.3
811
812
813.. _mysql_foreign_keys:
814
815MySQL / MariaDB Foreign Keys
816-----------------------------
817
818MySQL and MariaDB's behavior regarding foreign keys has some important caveats.
819
820Foreign Key Arguments to Avoid
821~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
822
823Neither MySQL nor MariaDB support the foreign key arguments "DEFERRABLE", "INITIALLY",
824or "MATCH". Using the ``deferrable`` or ``initially`` keyword argument with
825:class:`_schema.ForeignKeyConstraint` or :class:`_schema.ForeignKey`
826will have the effect of
827these keywords being rendered in a DDL expression, which will then raise an
828error on MySQL or MariaDB. In order to use these keywords on a foreign key while having
829them ignored on a MySQL / MariaDB backend, use a custom compile rule::
830
831 from sqlalchemy.ext.compiler import compiles
832 from sqlalchemy.schema import ForeignKeyConstraint
833
834
835 @compiles(ForeignKeyConstraint, "mysql", "mariadb")
836 def process(element, compiler, **kw):
837 element.deferrable = element.initially = None
838 return compiler.visit_foreign_key_constraint(element, **kw)
839
840The "MATCH" keyword is in fact more insidious, and is explicitly disallowed
841by SQLAlchemy in conjunction with the MySQL or MariaDB backends. This argument is
842silently ignored by MySQL / MariaDB, but in addition has the effect of ON UPDATE and ON
843DELETE options also being ignored by the backend. Therefore MATCH should
844never be used with the MySQL / MariaDB backends; as is the case with DEFERRABLE and
845INITIALLY, custom compilation rules can be used to correct a
846ForeignKeyConstraint at DDL definition time.
847
848Reflection of Foreign Key Constraints
849~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
850
851Not all MySQL / MariaDB storage engines support foreign keys. When using the
852very common ``MyISAM`` MySQL storage engine, the information loaded by table
853reflection will not include foreign keys. For these tables, you may supply a
854:class:`~sqlalchemy.ForeignKeyConstraint` at reflection time::
855
856 Table(
857 "mytable",
858 metadata,
859 ForeignKeyConstraint(["other_id"], ["othertable.other_id"]),
860 autoload_with=engine,
861 )
862
863.. seealso::
864
865 :ref:`mysql_storage_engines`
866
867.. _mysql_unique_constraints:
868
869MySQL / MariaDB Unique Constraints and Reflection
870----------------------------------------------------
871
872SQLAlchemy supports both the :class:`.Index` construct with the
873flag ``unique=True``, indicating a UNIQUE index, as well as the
874:class:`.UniqueConstraint` construct, representing a UNIQUE constraint.
875Both objects/syntaxes are supported by MySQL / MariaDB when emitting DDL to create
876these constraints. However, MySQL / MariaDB does not have a unique constraint
877construct that is separate from a unique index; that is, the "UNIQUE"
878constraint on MySQL / MariaDB is equivalent to creating a "UNIQUE INDEX".
879
880When reflecting these constructs, the
881:meth:`_reflection.Inspector.get_indexes`
882and the :meth:`_reflection.Inspector.get_unique_constraints`
883methods will **both**
884return an entry for a UNIQUE index in MySQL / MariaDB. However, when performing
885full table reflection using ``Table(..., autoload_with=engine)``,
886the :class:`.UniqueConstraint` construct is
887**not** part of the fully reflected :class:`_schema.Table` construct under any
888circumstances; this construct is always represented by a :class:`.Index`
889with the ``unique=True`` setting present in the :attr:`_schema.Table.indexes`
890collection.
891
892
893TIMESTAMP / DATETIME issues
894---------------------------
895
896.. _mysql_timestamp_onupdate:
897
898Rendering ON UPDATE CURRENT TIMESTAMP for MySQL / MariaDB's explicit_defaults_for_timestamp
899~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
900
901MySQL / MariaDB have historically expanded the DDL for the :class:`_types.TIMESTAMP`
902datatype into the phrase "TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE
903CURRENT_TIMESTAMP", which includes non-standard SQL that automatically updates
904the column with the current timestamp when an UPDATE occurs, eliminating the
905usual need to use a trigger in such a case where server-side update changes are
906desired.
907
908MySQL 5.6 introduced a new flag `explicit_defaults_for_timestamp
909<https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html
910#sysvar_explicit_defaults_for_timestamp>`_ which disables the above behavior,
911and in MySQL 8 this flag defaults to true, meaning in order to get a MySQL
912"on update timestamp" without changing this flag, the above DDL must be
913rendered explicitly. Additionally, the same DDL is valid for use of the
914``DATETIME`` datatype as well.
915
916SQLAlchemy's MySQL dialect does not yet have an option to generate
917MySQL's "ON UPDATE CURRENT_TIMESTAMP" clause, noting that this is not a general
918purpose "ON UPDATE" as there is no such syntax in standard SQL. SQLAlchemy's
919:paramref:`_schema.Column.server_onupdate` parameter is currently not related
920to this special MySQL behavior.
921
922To generate this DDL, make use of the :paramref:`_schema.Column.server_default`
923parameter and pass a textual clause that also includes the ON UPDATE clause::
924
925 from sqlalchemy import Table, MetaData, Column, Integer, String, TIMESTAMP
926 from sqlalchemy import text
927
928 metadata = MetaData()
929
930 mytable = Table(
931 "mytable",
932 metadata,
933 Column("id", Integer, primary_key=True),
934 Column("data", String(50)),
935 Column(
936 "last_updated",
937 TIMESTAMP,
938 server_default=text(
939 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
940 ),
941 ),
942 )
943
944The same instructions apply to use of the :class:`_types.DateTime` and
945:class:`_types.DATETIME` datatypes::
946
947 from sqlalchemy import DateTime
948
949 mytable = Table(
950 "mytable",
951 metadata,
952 Column("id", Integer, primary_key=True),
953 Column("data", String(50)),
954 Column(
955 "last_updated",
956 DateTime,
957 server_default=text(
958 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
959 ),
960 ),
961 )
962
963Even though the :paramref:`_schema.Column.server_onupdate` feature does not
964generate this DDL, it still may be desirable to signal to the ORM that this
965updated value should be fetched. This syntax looks like the following::
966
967 from sqlalchemy.schema import FetchedValue
968
969
970 class MyClass(Base):
971 __tablename__ = "mytable"
972
973 id = Column(Integer, primary_key=True)
974 data = Column(String(50))
975 last_updated = Column(
976 TIMESTAMP,
977 server_default=text(
978 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
979 ),
980 server_onupdate=FetchedValue(),
981 )
982
983.. _mysql_timestamp_null:
984
985TIMESTAMP Columns and NULL
986~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
987
988MySQL historically enforces that a column which specifies the
989TIMESTAMP datatype implicitly includes a default value of
990CURRENT_TIMESTAMP, even though this is not stated, and additionally
991sets the column as NOT NULL, the opposite behavior vs. that of all
992other datatypes:
993
994.. sourcecode:: text
995
996 mysql> CREATE TABLE ts_test (
997 -> a INTEGER,
998 -> b INTEGER NOT NULL,
999 -> c TIMESTAMP,
1000 -> d TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1001 -> e TIMESTAMP NULL);
1002 Query OK, 0 rows affected (0.03 sec)
1003
1004 mysql> SHOW CREATE TABLE ts_test;
1005 +---------+-----------------------------------------------------
1006 | Table | Create Table
1007 +---------+-----------------------------------------------------
1008 | ts_test | CREATE TABLE `ts_test` (
1009 `a` int(11) DEFAULT NULL,
1010 `b` int(11) NOT NULL,
1011 `c` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
1012 `d` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
1013 `e` timestamp NULL DEFAULT NULL
1014 ) ENGINE=MyISAM DEFAULT CHARSET=latin1
1015
1016Above, we see that an INTEGER column defaults to NULL, unless it is specified
1017with NOT NULL. But when the column is of type TIMESTAMP, an implicit
1018default of CURRENT_TIMESTAMP is generated which also coerces the column
1019to be a NOT NULL, even though we did not specify it as such.
1020
1021This behavior of MySQL can be changed on the MySQL side using the
1022`explicit_defaults_for_timestamp
1023<https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html
1024#sysvar_explicit_defaults_for_timestamp>`_ configuration flag introduced in
1025MySQL 5.6. With this server setting enabled, TIMESTAMP columns behave like
1026any other datatype on the MySQL side with regards to defaults and nullability.
1027
1028However, to accommodate the vast majority of MySQL databases that do not
1029specify this new flag, SQLAlchemy emits the "NULL" specifier explicitly with
1030any TIMESTAMP column that does not specify ``nullable=False``. In order to
1031accommodate newer databases that specify ``explicit_defaults_for_timestamp``,
1032SQLAlchemy also emits NOT NULL for TIMESTAMP columns that do specify
1033``nullable=False``. The following example illustrates::
1034
1035 from sqlalchemy import MetaData, Integer, Table, Column, text
1036 from sqlalchemy.dialects.mysql import TIMESTAMP
1037
1038 m = MetaData()
1039 t = Table(
1040 "ts_test",
1041 m,
1042 Column("a", Integer),
1043 Column("b", Integer, nullable=False),
1044 Column("c", TIMESTAMP),
1045 Column("d", TIMESTAMP, nullable=False),
1046 )
1047
1048
1049 from sqlalchemy import create_engine
1050
1051 e = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo=True)
1052 m.create_all(e)
1053
1054output:
1055
1056.. sourcecode:: sql
1057
1058 CREATE TABLE ts_test (
1059 a INTEGER,
1060 b INTEGER NOT NULL,
1061 c TIMESTAMP NULL,
1062 d TIMESTAMP NOT NULL
1063 )
1064
1065""" # noqa
1066from __future__ import annotations
1067
1068from collections import defaultdict
1069from itertools import compress
1070import re
1071from typing import Any
1072from typing import Callable
1073from typing import cast
1074from typing import DefaultDict
1075from typing import Dict
1076from typing import List
1077from typing import NoReturn
1078from typing import Optional
1079from typing import overload
1080from typing import Sequence
1081from typing import Tuple
1082from typing import TYPE_CHECKING
1083from typing import Union
1084
1085from . import reflection as _reflection
1086from .enumerated import ENUM
1087from .enumerated import SET
1088from .json import JSON
1089from .json import JSONIndexType
1090from .json import JSONPathType
1091from .reserved_words import RESERVED_WORDS_MARIADB
1092from .reserved_words import RESERVED_WORDS_MYSQL
1093from .types import _FloatType
1094from .types import _IntegerType
1095from .types import _MatchType
1096from .types import _NumericType
1097from .types import _StringType
1098from .types import BIGINT
1099from .types import BIT
1100from .types import CHAR
1101from .types import DATETIME
1102from .types import DECIMAL
1103from .types import DOUBLE
1104from .types import FLOAT
1105from .types import INTEGER
1106from .types import LONGBLOB
1107from .types import LONGTEXT
1108from .types import MEDIUMBLOB
1109from .types import MEDIUMINT
1110from .types import MEDIUMTEXT
1111from .types import NCHAR
1112from .types import NUMERIC
1113from .types import NVARCHAR
1114from .types import REAL
1115from .types import SMALLINT
1116from .types import TEXT
1117from .types import TIME
1118from .types import TIMESTAMP
1119from .types import TINYBLOB
1120from .types import TINYINT
1121from .types import TINYTEXT
1122from .types import VARCHAR
1123from .types import YEAR
1124from ... import exc
1125from ... import literal_column
1126from ... import schema as sa_schema
1127from ... import sql
1128from ... import util
1129from ...engine import cursor as _cursor
1130from ...engine import default
1131from ...engine import reflection
1132from ...engine.reflection import ReflectionDefaults
1133from ...sql import coercions
1134from ...sql import compiler
1135from ...sql import elements
1136from ...sql import functions
1137from ...sql import operators
1138from ...sql import roles
1139from ...sql import sqltypes
1140from ...sql import util as sql_util
1141from ...sql import visitors
1142from ...sql.compiler import InsertmanyvaluesSentinelOpts
1143from ...sql.compiler import SQLCompiler
1144from ...sql.schema import SchemaConst
1145from ...types import BINARY
1146from ...types import BLOB
1147from ...types import BOOLEAN
1148from ...types import DATE
1149from ...types import LargeBinary
1150from ...types import UUID
1151from ...types import VARBINARY
1152from ...util import topological
1153
1154if TYPE_CHECKING:
1155
1156 from ...dialects.mysql import expression
1157 from ...dialects.mysql.dml import OnDuplicateClause
1158 from ...engine.base import Connection
1159 from ...engine.cursor import CursorResult
1160 from ...engine.interfaces import DBAPIConnection
1161 from ...engine.interfaces import DBAPICursor
1162 from ...engine.interfaces import DBAPIModule
1163 from ...engine.interfaces import IsolationLevel
1164 from ...engine.interfaces import PoolProxiedConnection
1165 from ...engine.interfaces import ReflectedCheckConstraint
1166 from ...engine.interfaces import ReflectedColumn
1167 from ...engine.interfaces import ReflectedForeignKeyConstraint
1168 from ...engine.interfaces import ReflectedIndex
1169 from ...engine.interfaces import ReflectedPrimaryKeyConstraint
1170 from ...engine.interfaces import ReflectedTableComment
1171 from ...engine.interfaces import ReflectedUniqueConstraint
1172 from ...engine.row import Row
1173 from ...engine.url import URL
1174 from ...schema import Table
1175 from ...sql import ddl
1176 from ...sql import selectable
1177 from ...sql.dml import _DMLTableElement
1178 from ...sql.dml import Delete
1179 from ...sql.dml import Update
1180 from ...sql.dml import ValuesBase
1181 from ...sql.functions import aggregate_strings
1182 from ...sql.functions import random
1183 from ...sql.functions import rollup
1184 from ...sql.functions import sysdate
1185 from ...sql.schema import IdentityOptions
1186 from ...sql.schema import Sequence as Sequence_SchemaItem
1187 from ...sql.type_api import TypeEngine
1188 from ...sql.visitors import ExternallyTraversible
1189
1190
1191SET_RE = re.compile(
1192 r"\s*SET\s+(?:(?:GLOBAL|SESSION)\s+)?\w", re.I | re.UNICODE
1193)
1194
1195# old names
1196MSTime = TIME
1197MSSet = SET
1198MSEnum = ENUM
1199MSLongBlob = LONGBLOB
1200MSMediumBlob = MEDIUMBLOB
1201MSTinyBlob = TINYBLOB
1202MSBlob = BLOB
1203MSBinary = BINARY
1204MSVarBinary = VARBINARY
1205MSNChar = NCHAR
1206MSNVarChar = NVARCHAR
1207MSChar = CHAR
1208MSString = VARCHAR
1209MSLongText = LONGTEXT
1210MSMediumText = MEDIUMTEXT
1211MSTinyText = TINYTEXT
1212MSText = TEXT
1213MSYear = YEAR
1214MSTimeStamp = TIMESTAMP
1215MSBit = BIT
1216MSSmallInteger = SMALLINT
1217MSTinyInteger = TINYINT
1218MSMediumInteger = MEDIUMINT
1219MSBigInteger = BIGINT
1220MSNumeric = NUMERIC
1221MSDecimal = DECIMAL
1222MSDouble = DOUBLE
1223MSReal = REAL
1224MSFloat = FLOAT
1225MSInteger = INTEGER
1226
1227colspecs = {
1228 _IntegerType: _IntegerType,
1229 _NumericType: _NumericType,
1230 _FloatType: _FloatType,
1231 sqltypes.Numeric: NUMERIC,
1232 sqltypes.Float: FLOAT,
1233 sqltypes.Double: DOUBLE,
1234 sqltypes.Time: TIME,
1235 sqltypes.Enum: ENUM,
1236 sqltypes.MatchType: _MatchType,
1237 sqltypes.JSON: JSON,
1238 sqltypes.JSON.JSONIndexType: JSONIndexType,
1239 sqltypes.JSON.JSONPathType: JSONPathType,
1240}
1241
1242# Everything 3.23 through 5.1 excepting OpenGIS types.
1243ischema_names = {
1244 "bigint": BIGINT,
1245 "binary": BINARY,
1246 "bit": BIT,
1247 "blob": BLOB,
1248 "boolean": BOOLEAN,
1249 "char": CHAR,
1250 "date": DATE,
1251 "datetime": DATETIME,
1252 "decimal": DECIMAL,
1253 "double": DOUBLE,
1254 "enum": ENUM,
1255 "fixed": DECIMAL,
1256 "float": FLOAT,
1257 "int": INTEGER,
1258 "integer": INTEGER,
1259 "json": JSON,
1260 "longblob": LONGBLOB,
1261 "longtext": LONGTEXT,
1262 "mediumblob": MEDIUMBLOB,
1263 "mediumint": MEDIUMINT,
1264 "mediumtext": MEDIUMTEXT,
1265 "nchar": NCHAR,
1266 "nvarchar": NVARCHAR,
1267 "numeric": NUMERIC,
1268 "set": SET,
1269 "smallint": SMALLINT,
1270 "text": TEXT,
1271 "time": TIME,
1272 "timestamp": TIMESTAMP,
1273 "tinyblob": TINYBLOB,
1274 "tinyint": TINYINT,
1275 "tinytext": TINYTEXT,
1276 "uuid": UUID,
1277 "varbinary": VARBINARY,
1278 "varchar": VARCHAR,
1279 "year": YEAR,
1280}
1281
1282
1283class MySQLExecutionContext(default.DefaultExecutionContext):
1284 def post_exec(self) -> None:
1285 if (
1286 self.isdelete
1287 and cast(SQLCompiler, self.compiled).effective_returning
1288 and not self.cursor.description
1289 ):
1290 # All MySQL/mariadb drivers appear to not include
1291 # cursor.description for DELETE..RETURNING with no rows if the
1292 # WHERE criteria is a straight "false" condition such as our EMPTY
1293 # IN condition. manufacture an empty result in this case (issue
1294 # #10505)
1295 #
1296 # taken from cx_Oracle implementation
1297 self.cursor_fetch_strategy = (
1298 _cursor.FullyBufferedCursorFetchStrategy(
1299 self.cursor,
1300 [
1301 (entry.keyname, None) # type: ignore[misc]
1302 for entry in cast(
1303 SQLCompiler, self.compiled
1304 )._result_columns
1305 ],
1306 [],
1307 )
1308 )
1309
1310 def create_server_side_cursor(self) -> DBAPICursor:
1311 if self.dialect.supports_server_side_cursors:
1312 return self._dbapi_connection.cursor(
1313 self.dialect._sscursor # type: ignore[attr-defined]
1314 )
1315 else:
1316 raise NotImplementedError()
1317
1318 def fire_sequence(
1319 self, seq: Sequence_SchemaItem, type_: sqltypes.Integer
1320 ) -> int:
1321 return self._execute_scalar( # type: ignore[no-any-return]
1322 (
1323 "select nextval(%s)"
1324 % self.identifier_preparer.format_sequence(seq)
1325 ),
1326 type_,
1327 )
1328
1329
1330class MySQLCompiler(compiler.SQLCompiler):
1331 dialect: MySQLDialect
1332 render_table_with_column_in_update_from = True
1333 """Overridden from base SQLCompiler value"""
1334
1335 extract_map = compiler.SQLCompiler.extract_map.copy()
1336 extract_map.update({"milliseconds": "millisecond"})
1337
1338 def default_from(self) -> str:
1339 """Called when a ``SELECT`` statement has no froms,
1340 and no ``FROM`` clause is to be appended.
1341
1342 """
1343 if self.stack:
1344 stmt = self.stack[-1]["selectable"]
1345 if stmt._where_criteria: # type: ignore[attr-defined]
1346 return " FROM DUAL"
1347
1348 return ""
1349
1350 def visit_random_func(self, fn: random, **kw: Any) -> str:
1351 return "rand%s" % self.function_argspec(fn)
1352
1353 def visit_rollup_func(self, fn: rollup[Any], **kw: Any) -> str:
1354 clause = ", ".join(
1355 elem._compiler_dispatch(self, **kw) for elem in fn.clauses
1356 )
1357 return f"{clause} WITH ROLLUP"
1358
1359 def visit_aggregate_strings_func(
1360 self, fn: aggregate_strings, **kw: Any
1361 ) -> str:
1362 expr, delimiter = (
1363 elem._compiler_dispatch(self, **kw) for elem in fn.clauses
1364 )
1365 return f"group_concat({expr} SEPARATOR {delimiter})"
1366
1367 def visit_sequence(self, sequence: sa_schema.Sequence, **kw: Any) -> str:
1368 return "nextval(%s)" % self.preparer.format_sequence(sequence)
1369
1370 def visit_sysdate_func(self, fn: sysdate, **kw: Any) -> str:
1371 return "SYSDATE()"
1372
1373 def _render_json_extract_from_binary(
1374 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1375 ) -> str:
1376 # note we are intentionally calling upon the process() calls in the
1377 # order in which they appear in the SQL String as this is used
1378 # by positional parameter rendering
1379
1380 if binary.type._type_affinity is sqltypes.JSON:
1381 return "JSON_EXTRACT(%s, %s)" % (
1382 self.process(binary.left, **kw),
1383 self.process(binary.right, **kw),
1384 )
1385
1386 # for non-JSON, MySQL doesn't handle JSON null at all so it has to
1387 # be explicit
1388 case_expression = "CASE JSON_EXTRACT(%s, %s) WHEN 'null' THEN NULL" % (
1389 self.process(binary.left, **kw),
1390 self.process(binary.right, **kw),
1391 )
1392
1393 if binary.type._type_affinity is sqltypes.Integer:
1394 type_expression = (
1395 "ELSE CAST(JSON_EXTRACT(%s, %s) AS SIGNED INTEGER)"
1396 % (
1397 self.process(binary.left, **kw),
1398 self.process(binary.right, **kw),
1399 )
1400 )
1401 elif binary.type._type_affinity is sqltypes.Numeric:
1402 binary_type = cast(sqltypes.Numeric[Any], binary.type)
1403 if (
1404 binary_type.scale is not None
1405 and binary_type.precision is not None
1406 ):
1407 # using DECIMAL here because MySQL does not recognize NUMERIC
1408 type_expression = (
1409 "ELSE CAST(JSON_EXTRACT(%s, %s) AS DECIMAL(%s, %s))"
1410 % (
1411 self.process(binary.left, **kw),
1412 self.process(binary.right, **kw),
1413 binary_type.precision,
1414 binary_type.scale,
1415 )
1416 )
1417 else:
1418 # FLOAT / REAL not added in MySQL til 8.0.17
1419 type_expression = (
1420 "ELSE JSON_EXTRACT(%s, %s)+0.0000000000000000000000"
1421 % (
1422 self.process(binary.left, **kw),
1423 self.process(binary.right, **kw),
1424 )
1425 )
1426 elif binary.type._type_affinity is sqltypes.Boolean:
1427 # the NULL handling is particularly weird with boolean, so
1428 # explicitly return true/false constants
1429 type_expression = "WHEN true THEN true ELSE false"
1430 elif binary.type._type_affinity is sqltypes.String:
1431 # (gord): this fails with a JSON value that's a four byte unicode
1432 # string. SQLite has the same problem at the moment
1433 # (zzzeek): I'm not really sure. let's take a look at a test case
1434 # that hits each backend and maybe make a requires rule for it?
1435 type_expression = "ELSE JSON_UNQUOTE(JSON_EXTRACT(%s, %s))" % (
1436 self.process(binary.left, **kw),
1437 self.process(binary.right, **kw),
1438 )
1439 else:
1440 # other affinity....this is not expected right now
1441 type_expression = "ELSE JSON_EXTRACT(%s, %s)" % (
1442 self.process(binary.left, **kw),
1443 self.process(binary.right, **kw),
1444 )
1445
1446 return case_expression + " " + type_expression + " END"
1447
1448 def visit_json_getitem_op_binary(
1449 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1450 ) -> str:
1451 return self._render_json_extract_from_binary(binary, operator, **kw)
1452
1453 def visit_json_path_getitem_op_binary(
1454 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1455 ) -> str:
1456 return self._render_json_extract_from_binary(binary, operator, **kw)
1457
1458 def visit_on_duplicate_key_update(
1459 self, on_duplicate: OnDuplicateClause, **kw: Any
1460 ) -> str:
1461 statement: ValuesBase = self.current_executable
1462
1463 cols: List[elements.KeyedColumnElement[Any]]
1464 if on_duplicate._parameter_ordering:
1465 parameter_ordering = [
1466 coercions.expect(roles.DMLColumnRole, key)
1467 for key in on_duplicate._parameter_ordering
1468 ]
1469 ordered_keys = set(parameter_ordering)
1470 cols = [
1471 statement.table.c[key]
1472 for key in parameter_ordering
1473 if key in statement.table.c
1474 ] + [c for c in statement.table.c if c.key not in ordered_keys]
1475 else:
1476 cols = list(statement.table.c)
1477
1478 clauses = []
1479
1480 requires_mysql8_alias = statement.select is None and (
1481 self.dialect._requires_alias_for_on_duplicate_key
1482 )
1483
1484 if requires_mysql8_alias:
1485 if statement.table.name.lower() == "new": # type: ignore[union-attr] # noqa: E501
1486 _on_dup_alias_name = "new_1"
1487 else:
1488 _on_dup_alias_name = "new"
1489
1490 on_duplicate_update = {
1491 coercions.expect_as_key(roles.DMLColumnRole, key): value
1492 for key, value in on_duplicate.update.items()
1493 }
1494
1495 # traverses through all table columns to preserve table column order
1496 for column in (col for col in cols if col.key in on_duplicate_update):
1497 val = on_duplicate_update[column.key]
1498
1499 # TODO: this coercion should be up front. we can't cache
1500 # SQL constructs with non-bound literals buried in them
1501 if coercions._is_literal(val):
1502 val = elements.BindParameter(None, val, type_=column.type)
1503 value_text = self.process(val.self_group(), use_schema=False)
1504 else:
1505
1506 def replace(
1507 element: ExternallyTraversible, **kw: Any
1508 ) -> Optional[ExternallyTraversible]:
1509 if (
1510 isinstance(element, elements.BindParameter)
1511 and element.type._isnull
1512 ):
1513 return element._with_binary_element_type(column.type)
1514 elif (
1515 isinstance(element, elements.ColumnClause)
1516 and element.table is on_duplicate.inserted_alias
1517 ):
1518 if requires_mysql8_alias:
1519 column_literal_clause = (
1520 f"{_on_dup_alias_name}."
1521 f"{self.preparer.quote(element.name)}"
1522 )
1523 else:
1524 column_literal_clause = (
1525 f"VALUES({self.preparer.quote(element.name)})"
1526 )
1527 return literal_column(column_literal_clause)
1528 else:
1529 # element is not replaced
1530 return None
1531
1532 val = visitors.replacement_traverse(val, {}, replace)
1533 value_text = self.process(val.self_group(), use_schema=False)
1534
1535 name_text = self.preparer.quote(column.name)
1536 clauses.append("%s = %s" % (name_text, value_text))
1537
1538 non_matching = set(on_duplicate_update) - {c.key for c in cols}
1539 if non_matching:
1540 util.warn(
1541 "Additional column names not matching "
1542 "any column keys in table '%s': %s"
1543 % (
1544 self.statement.table.name, # type: ignore[union-attr]
1545 (", ".join("'%s'" % c for c in non_matching)),
1546 )
1547 )
1548
1549 if requires_mysql8_alias:
1550 return (
1551 f"AS {_on_dup_alias_name} "
1552 f"ON DUPLICATE KEY UPDATE {', '.join(clauses)}"
1553 )
1554 else:
1555 return f"ON DUPLICATE KEY UPDATE {', '.join(clauses)}"
1556
1557 def visit_concat_op_expression_clauselist(
1558 self, clauselist: elements.ClauseList, operator: Any, **kw: Any
1559 ) -> str:
1560 return "concat(%s)" % (
1561 ", ".join(self.process(elem, **kw) for elem in clauselist.clauses)
1562 )
1563
1564 def visit_concat_op_binary(
1565 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1566 ) -> str:
1567 return "concat(%s, %s)" % (
1568 self.process(binary.left, **kw),
1569 self.process(binary.right, **kw),
1570 )
1571
1572 _match_valid_flag_combinations = frozenset(
1573 (
1574 # (boolean_mode, natural_language, query_expansion)
1575 (False, False, False),
1576 (True, False, False),
1577 (False, True, False),
1578 (False, False, True),
1579 (False, True, True),
1580 )
1581 )
1582
1583 _match_flag_expressions = (
1584 "IN BOOLEAN MODE",
1585 "IN NATURAL LANGUAGE MODE",
1586 "WITH QUERY EXPANSION",
1587 )
1588
1589 def visit_mysql_match(self, element: expression.match, **kw: Any) -> str:
1590 return self.visit_match_op_binary(element, element.operator, **kw)
1591
1592 def visit_match_op_binary(
1593 self, binary: expression.match, operator: Any, **kw: Any
1594 ) -> str:
1595 """
1596 Note that `mysql_boolean_mode` is enabled by default because of
1597 backward compatibility
1598 """
1599
1600 modifiers = binary.modifiers
1601
1602 boolean_mode = modifiers.get("mysql_boolean_mode", True)
1603 natural_language = modifiers.get("mysql_natural_language", False)
1604 query_expansion = modifiers.get("mysql_query_expansion", False)
1605
1606 flag_combination = (boolean_mode, natural_language, query_expansion)
1607
1608 if flag_combination not in self._match_valid_flag_combinations:
1609 flags = (
1610 "in_boolean_mode=%s" % boolean_mode,
1611 "in_natural_language_mode=%s" % natural_language,
1612 "with_query_expansion=%s" % query_expansion,
1613 )
1614
1615 flags_str = ", ".join(flags)
1616
1617 raise exc.CompileError("Invalid MySQL match flags: %s" % flags_str)
1618
1619 match_clause = self.process(binary.left, **kw)
1620 against_clause = self.process(binary.right, **kw)
1621
1622 if any(flag_combination):
1623 flag_expressions = compress(
1624 self._match_flag_expressions,
1625 flag_combination,
1626 )
1627
1628 against_clause = " ".join([against_clause, *flag_expressions])
1629
1630 return "MATCH (%s) AGAINST (%s)" % (match_clause, against_clause)
1631
1632 def get_from_hint_text(
1633 self, table: selectable.FromClause, text: Optional[str]
1634 ) -> Optional[str]:
1635 return text
1636
1637 def visit_typeclause(
1638 self,
1639 typeclause: elements.TypeClause,
1640 type_: Optional[TypeEngine[Any]] = None,
1641 **kw: Any,
1642 ) -> Optional[str]:
1643 if type_ is None:
1644 type_ = typeclause.type.dialect_impl(self.dialect)
1645 if isinstance(type_, sqltypes.TypeDecorator):
1646 return self.visit_typeclause(typeclause, type_.impl, **kw) # type: ignore[arg-type] # noqa: E501
1647 elif isinstance(type_, sqltypes.Integer):
1648 if getattr(type_, "unsigned", False):
1649 return "UNSIGNED INTEGER"
1650 else:
1651 return "SIGNED INTEGER"
1652 elif isinstance(type_, sqltypes.TIMESTAMP):
1653 return "DATETIME"
1654 elif isinstance(
1655 type_,
1656 (
1657 sqltypes.DECIMAL,
1658 sqltypes.DateTime,
1659 sqltypes.Date,
1660 sqltypes.Time,
1661 ),
1662 ):
1663 return self.dialect.type_compiler_instance.process(type_)
1664 elif isinstance(type_, sqltypes.String) and not isinstance(
1665 type_, (ENUM, SET)
1666 ):
1667 adapted = CHAR._adapt_string_for_cast(type_)
1668 return self.dialect.type_compiler_instance.process(adapted)
1669 elif isinstance(type_, sqltypes._Binary):
1670 return "BINARY"
1671 elif isinstance(type_, sqltypes.JSON):
1672 return "JSON"
1673 elif isinstance(type_, sqltypes.NUMERIC):
1674 return self.dialect.type_compiler_instance.process(type_).replace(
1675 "NUMERIC", "DECIMAL"
1676 )
1677 elif (
1678 isinstance(type_, sqltypes.Float)
1679 and self.dialect._support_float_cast
1680 ):
1681 return self.dialect.type_compiler_instance.process(type_)
1682 else:
1683 return None
1684
1685 def visit_cast(self, cast: elements.Cast[Any], **kw: Any) -> str:
1686 type_ = self.process(cast.typeclause)
1687 if type_ is None:
1688 util.warn(
1689 "Datatype %s does not support CAST on MySQL/MariaDb; "
1690 "the CAST will be skipped."
1691 % self.dialect.type_compiler_instance.process(
1692 cast.typeclause.type
1693 )
1694 )
1695 return self.process(cast.clause.self_group(), **kw)
1696
1697 return "CAST(%s AS %s)" % (self.process(cast.clause, **kw), type_)
1698
1699 def render_literal_value(
1700 self, value: Optional[str], type_: TypeEngine[Any]
1701 ) -> str:
1702 value = super().render_literal_value(value, type_)
1703 if self.dialect._backslash_escapes:
1704 value = value.replace("\\", "\\\\")
1705 return value
1706
1707 # override native_boolean=False behavior here, as
1708 # MySQL still supports native boolean
1709 def visit_true(self, expr: elements.True_, **kw: Any) -> str:
1710 return "true"
1711
1712 def visit_false(self, expr: elements.False_, **kw: Any) -> str:
1713 return "false"
1714
1715 def get_select_precolumns(
1716 self, select: selectable.Select[Any], **kw: Any
1717 ) -> str:
1718 """Add special MySQL keywords in place of DISTINCT.
1719
1720 .. deprecated:: 1.4 This usage is deprecated.
1721 :meth:`_expression.Select.prefix_with` should be used for special
1722 keywords at the start of a SELECT.
1723
1724 """
1725 if isinstance(select._distinct, str):
1726 util.warn_deprecated(
1727 "Sending string values for 'distinct' is deprecated in the "
1728 "MySQL dialect and will be removed in a future release. "
1729 "Please use :meth:`.Select.prefix_with` for special keywords "
1730 "at the start of a SELECT statement",
1731 version="1.4",
1732 )
1733 return select._distinct.upper() + " "
1734
1735 return super().get_select_precolumns(select, **kw)
1736
1737 def visit_join(
1738 self,
1739 join: selectable.Join,
1740 asfrom: bool = False,
1741 from_linter: Optional[compiler.FromLinter] = None,
1742 **kwargs: Any,
1743 ) -> str:
1744 if from_linter:
1745 from_linter.edges.add((join.left, join.right))
1746
1747 if join.full:
1748 join_type = " FULL OUTER JOIN "
1749 elif join.isouter:
1750 join_type = " LEFT OUTER JOIN "
1751 else:
1752 join_type = " INNER JOIN "
1753
1754 return "".join(
1755 (
1756 self.process(
1757 join.left, asfrom=True, from_linter=from_linter, **kwargs
1758 ),
1759 join_type,
1760 self.process(
1761 join.right, asfrom=True, from_linter=from_linter, **kwargs
1762 ),
1763 " ON ",
1764 self.process(join.onclause, from_linter=from_linter, **kwargs), # type: ignore[arg-type] # noqa: E501
1765 )
1766 )
1767
1768 def for_update_clause(
1769 self, select: selectable.GenerativeSelect, **kw: Any
1770 ) -> str:
1771 assert select._for_update_arg is not None
1772 if select._for_update_arg.read:
1773 if self.dialect.use_mysql_for_share:
1774 tmp = " FOR SHARE"
1775 else:
1776 tmp = " LOCK IN SHARE MODE"
1777 else:
1778 tmp = " FOR UPDATE"
1779
1780 if select._for_update_arg.of and self.dialect.supports_for_update_of:
1781 tables: util.OrderedSet[elements.ClauseElement] = util.OrderedSet()
1782 for c in select._for_update_arg.of:
1783 tables.update(sql_util.surface_selectables_only(c))
1784
1785 tmp += " OF " + ", ".join(
1786 self.process(table, ashint=True, use_schema=False, **kw)
1787 for table in tables
1788 )
1789
1790 if select._for_update_arg.nowait:
1791 tmp += " NOWAIT"
1792
1793 if select._for_update_arg.skip_locked:
1794 tmp += " SKIP LOCKED"
1795
1796 return tmp
1797
1798 def limit_clause(
1799 self, select: selectable.GenerativeSelect, **kw: Any
1800 ) -> str:
1801 # MySQL supports:
1802 # LIMIT <limit>
1803 # LIMIT <offset>, <limit>
1804 # and in server versions > 3.3:
1805 # LIMIT <limit> OFFSET <offset>
1806 # The latter is more readable for offsets but we're stuck with the
1807 # former until we can refine dialects by server revision.
1808
1809 limit_clause, offset_clause = (
1810 select._limit_clause,
1811 select._offset_clause,
1812 )
1813
1814 if limit_clause is None and offset_clause is None:
1815 return ""
1816 elif offset_clause is not None:
1817 # As suggested by the MySQL docs, need to apply an
1818 # artificial limit if one wasn't provided
1819 # https://dev.mysql.com/doc/refman/5.0/en/select.html
1820 if limit_clause is None:
1821 # TODO: remove ??
1822 # hardwire the upper limit. Currently
1823 # needed consistent with the usage of the upper
1824 # bound as part of MySQL's "syntax" for OFFSET with
1825 # no LIMIT.
1826 return " \n LIMIT %s, %s" % (
1827 self.process(offset_clause, **kw),
1828 "18446744073709551615",
1829 )
1830 else:
1831 return " \n LIMIT %s, %s" % (
1832 self.process(offset_clause, **kw),
1833 self.process(limit_clause, **kw),
1834 )
1835 else:
1836 assert limit_clause is not None
1837 # No offset provided, so just use the limit
1838 return " \n LIMIT %s" % (self.process(limit_clause, **kw),)
1839
1840 def update_limit_clause(self, update_stmt: Update) -> Optional[str]:
1841 limit = update_stmt.kwargs.get("%s_limit" % self.dialect.name, None)
1842 if limit is not None:
1843 return f"LIMIT {int(limit)}"
1844 else:
1845 return None
1846
1847 def delete_limit_clause(self, delete_stmt: Delete) -> Optional[str]:
1848 limit = delete_stmt.kwargs.get("%s_limit" % self.dialect.name, None)
1849 if limit is not None:
1850 return f"LIMIT {int(limit)}"
1851 else:
1852 return None
1853
1854 def update_tables_clause(
1855 self,
1856 update_stmt: Update,
1857 from_table: _DMLTableElement,
1858 extra_froms: List[selectable.FromClause],
1859 **kw: Any,
1860 ) -> str:
1861 kw["asfrom"] = True
1862 return ", ".join(
1863 t._compiler_dispatch(self, **kw)
1864 for t in [from_table] + list(extra_froms)
1865 )
1866
1867 def update_from_clause(
1868 self,
1869 update_stmt: Update,
1870 from_table: _DMLTableElement,
1871 extra_froms: List[selectable.FromClause],
1872 from_hints: Any,
1873 **kw: Any,
1874 ) -> None:
1875 return None
1876
1877 def delete_table_clause(
1878 self,
1879 delete_stmt: Delete,
1880 from_table: _DMLTableElement,
1881 extra_froms: List[selectable.FromClause],
1882 **kw: Any,
1883 ) -> str:
1884 """If we have extra froms make sure we render any alias as hint."""
1885 ashint = False
1886 if extra_froms:
1887 ashint = True
1888 return from_table._compiler_dispatch(
1889 self, asfrom=True, iscrud=True, ashint=ashint, **kw
1890 )
1891
1892 def delete_extra_from_clause(
1893 self,
1894 delete_stmt: Delete,
1895 from_table: _DMLTableElement,
1896 extra_froms: List[selectable.FromClause],
1897 from_hints: Any,
1898 **kw: Any,
1899 ) -> str:
1900 """Render the DELETE .. USING clause specific to MySQL."""
1901 kw["asfrom"] = True
1902 return "USING " + ", ".join(
1903 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1904 for t in [from_table] + extra_froms
1905 )
1906
1907 def visit_empty_set_expr(
1908 self, element_types: List[TypeEngine[Any]], **kw: Any
1909 ) -> str:
1910 return (
1911 "SELECT %(outer)s FROM (SELECT %(inner)s) "
1912 "as _empty_set WHERE 1!=1"
1913 % {
1914 "inner": ", ".join(
1915 "1 AS _in_%s" % idx
1916 for idx, type_ in enumerate(element_types)
1917 ),
1918 "outer": ", ".join(
1919 "_in_%s" % idx for idx, type_ in enumerate(element_types)
1920 ),
1921 }
1922 )
1923
1924 def visit_is_distinct_from_binary(
1925 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1926 ) -> str:
1927 return "NOT (%s <=> %s)" % (
1928 self.process(binary.left),
1929 self.process(binary.right),
1930 )
1931
1932 def visit_is_not_distinct_from_binary(
1933 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1934 ) -> str:
1935 return "%s <=> %s" % (
1936 self.process(binary.left),
1937 self.process(binary.right),
1938 )
1939
1940 def _mariadb_regexp_flags(
1941 self, flags: str, pattern: elements.ColumnElement[Any], **kw: Any
1942 ) -> str:
1943 return "CONCAT('(?', %s, ')', %s)" % (
1944 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1945 self.process(pattern, **kw),
1946 )
1947
1948 def _regexp_match(
1949 self,
1950 op_string: str,
1951 binary: elements.BinaryExpression[Any],
1952 operator: Any,
1953 **kw: Any,
1954 ) -> str:
1955 assert binary.modifiers is not None
1956 flags = binary.modifiers["flags"]
1957 if flags is None:
1958 return self._generate_generic_binary(binary, op_string, **kw)
1959 elif self.dialect.is_mariadb:
1960 return "%s%s%s" % (
1961 self.process(binary.left, **kw),
1962 op_string,
1963 self._mariadb_regexp_flags(flags, binary.right),
1964 )
1965 else:
1966 text = "REGEXP_LIKE(%s, %s, %s)" % (
1967 self.process(binary.left, **kw),
1968 self.process(binary.right, **kw),
1969 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1970 )
1971 if op_string == " NOT REGEXP ":
1972 return "NOT %s" % text
1973 else:
1974 return text
1975
1976 def visit_regexp_match_op_binary(
1977 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1978 ) -> str:
1979 return self._regexp_match(" REGEXP ", binary, operator, **kw)
1980
1981 def visit_not_regexp_match_op_binary(
1982 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1983 ) -> str:
1984 return self._regexp_match(" NOT REGEXP ", binary, operator, **kw)
1985
1986 def visit_regexp_replace_op_binary(
1987 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1988 ) -> str:
1989 assert binary.modifiers is not None
1990 flags = binary.modifiers["flags"]
1991 if flags is None:
1992 return "REGEXP_REPLACE(%s, %s)" % (
1993 self.process(binary.left, **kw),
1994 self.process(binary.right, **kw),
1995 )
1996 elif self.dialect.is_mariadb:
1997 return "REGEXP_REPLACE(%s, %s, %s)" % (
1998 self.process(binary.left, **kw),
1999 self._mariadb_regexp_flags(flags, binary.right.clauses[0]),
2000 self.process(binary.right.clauses[1], **kw),
2001 )
2002 else:
2003 return "REGEXP_REPLACE(%s, %s, %s)" % (
2004 self.process(binary.left, **kw),
2005 self.process(binary.right, **kw),
2006 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2007 )
2008
2009
2010class MySQLDDLCompiler(compiler.DDLCompiler):
2011 dialect: MySQLDialect
2012
2013 def get_column_specification(
2014 self, column: sa_schema.Column[Any], **kw: Any
2015 ) -> str:
2016 """Builds column DDL."""
2017 if (
2018 self.dialect.is_mariadb is True
2019 and column.computed is not None
2020 and column._user_defined_nullable is SchemaConst.NULL_UNSPECIFIED
2021 ):
2022 column.nullable = True
2023 colspec = [
2024 self.preparer.format_column(column),
2025 self.dialect.type_compiler_instance.process(
2026 column.type, type_expression=column
2027 ),
2028 ]
2029
2030 if column.computed is not None:
2031 colspec.append(self.process(column.computed))
2032
2033 is_timestamp = isinstance(
2034 column.type._unwrapped_dialect_impl(self.dialect),
2035 sqltypes.TIMESTAMP,
2036 )
2037
2038 if not column.nullable:
2039 colspec.append("NOT NULL")
2040
2041 # see: https://docs.sqlalchemy.org/en/latest/dialects/mysql.html#mysql_timestamp_null # noqa
2042 elif column.nullable and is_timestamp:
2043 colspec.append("NULL")
2044
2045 comment = column.comment
2046 if comment is not None:
2047 literal = self.sql_compiler.render_literal_value(
2048 comment, sqltypes.String()
2049 )
2050 colspec.append("COMMENT " + literal)
2051
2052 if (
2053 column.table is not None
2054 and column is column.table._autoincrement_column
2055 and (
2056 column.server_default is None
2057 or isinstance(column.server_default, sa_schema.Identity)
2058 )
2059 and not (
2060 self.dialect.supports_sequences
2061 and isinstance(column.default, sa_schema.Sequence)
2062 and not column.default.optional
2063 )
2064 ):
2065 colspec.append("AUTO_INCREMENT")
2066 else:
2067 default = self.get_column_default_string(column)
2068
2069 if default is not None:
2070 if (
2071 self.dialect._support_default_function
2072 and not re.match(r"^\s*[\'\"\(]", default)
2073 and not re.search(r"ON +UPDATE", default, re.I)
2074 and not re.match(
2075 r"\bnow\(\d+\)|\bcurrent_timestamp\(\d+\)",
2076 default,
2077 re.I,
2078 )
2079 and re.match(r".*\W.*", default)
2080 ):
2081 colspec.append(f"DEFAULT ({default})")
2082 else:
2083 colspec.append("DEFAULT " + default)
2084 return " ".join(colspec)
2085
2086 def post_create_table(self, table: sa_schema.Table) -> str:
2087 """Build table-level CREATE options like ENGINE and COLLATE."""
2088
2089 table_opts = []
2090
2091 opts = {
2092 k[len(self.dialect.name) + 1 :].upper(): v
2093 for k, v in table.kwargs.items()
2094 if k.startswith("%s_" % self.dialect.name)
2095 }
2096
2097 if table.comment is not None:
2098 opts["COMMENT"] = table.comment
2099
2100 partition_options = [
2101 "PARTITION_BY",
2102 "PARTITIONS",
2103 "SUBPARTITIONS",
2104 "SUBPARTITION_BY",
2105 ]
2106
2107 nonpart_options = set(opts).difference(partition_options)
2108 part_options = set(opts).intersection(partition_options)
2109
2110 for opt in topological.sort(
2111 [
2112 ("DEFAULT_CHARSET", "COLLATE"),
2113 ("DEFAULT_CHARACTER_SET", "COLLATE"),
2114 ("CHARSET", "COLLATE"),
2115 ("CHARACTER_SET", "COLLATE"),
2116 ],
2117 nonpart_options,
2118 ):
2119 arg = opts[opt]
2120 if opt in _reflection._options_of_type_string:
2121 arg = self.sql_compiler.render_literal_value(
2122 arg, sqltypes.String()
2123 )
2124
2125 if opt in (
2126 "DATA_DIRECTORY",
2127 "INDEX_DIRECTORY",
2128 "DEFAULT_CHARACTER_SET",
2129 "CHARACTER_SET",
2130 "DEFAULT_CHARSET",
2131 "DEFAULT_COLLATE",
2132 ):
2133 opt = opt.replace("_", " ")
2134
2135 joiner = "="
2136 if opt in (
2137 "TABLESPACE",
2138 "DEFAULT CHARACTER SET",
2139 "CHARACTER SET",
2140 "COLLATE",
2141 ):
2142 joiner = " "
2143
2144 table_opts.append(joiner.join((opt, arg)))
2145
2146 for opt in topological.sort(
2147 [
2148 ("PARTITION_BY", "PARTITIONS"),
2149 ("PARTITION_BY", "SUBPARTITION_BY"),
2150 ("PARTITION_BY", "SUBPARTITIONS"),
2151 ("PARTITIONS", "SUBPARTITIONS"),
2152 ("PARTITIONS", "SUBPARTITION_BY"),
2153 ("SUBPARTITION_BY", "SUBPARTITIONS"),
2154 ],
2155 part_options,
2156 ):
2157 arg = opts[opt]
2158 if opt in _reflection._options_of_type_string:
2159 arg = self.sql_compiler.render_literal_value(
2160 arg, sqltypes.String()
2161 )
2162
2163 opt = opt.replace("_", " ")
2164 joiner = " "
2165
2166 table_opts.append(joiner.join((opt, arg)))
2167
2168 return " ".join(table_opts)
2169
2170 def visit_create_index(self, create: ddl.CreateIndex, **kw: Any) -> str: # type: ignore[override] # noqa: E501
2171 index = create.element
2172 self._verify_index_table(index)
2173 preparer = self.preparer
2174 table = preparer.format_table(index.table) # type: ignore[arg-type]
2175
2176 columns = [
2177 self.sql_compiler.process(
2178 (
2179 elements.Grouping(expr) # type: ignore[arg-type]
2180 if (
2181 isinstance(expr, elements.BinaryExpression)
2182 or (
2183 isinstance(expr, elements.UnaryExpression)
2184 and expr.modifier
2185 not in (operators.desc_op, operators.asc_op)
2186 )
2187 or isinstance(expr, functions.FunctionElement)
2188 )
2189 else expr
2190 ),
2191 include_table=False,
2192 literal_binds=True,
2193 )
2194 for expr in index.expressions
2195 ]
2196
2197 name = self._prepared_index_name(index)
2198
2199 text = "CREATE "
2200 if index.unique:
2201 text += "UNIQUE "
2202
2203 index_prefix = index.kwargs.get("%s_prefix" % self.dialect.name, None)
2204 if index_prefix:
2205 text += index_prefix + " "
2206
2207 text += "INDEX "
2208 if create.if_not_exists:
2209 text += "IF NOT EXISTS "
2210 text += "%s ON %s " % (name, table)
2211
2212 length = index.dialect_options[self.dialect.name]["length"]
2213 if length is not None:
2214 if isinstance(length, dict):
2215 # length value can be a (column_name --> integer value)
2216 # mapping specifying the prefix length for each column of the
2217 # index
2218 columns_str = ", ".join(
2219 (
2220 "%s(%d)" % (expr, length[col.name]) # type: ignore[union-attr] # noqa: E501
2221 if col.name in length # type: ignore[union-attr]
2222 else (
2223 "%s(%d)" % (expr, length[expr])
2224 if expr in length
2225 else "%s" % expr
2226 )
2227 )
2228 for col, expr in zip(index.expressions, columns)
2229 )
2230 else:
2231 # or can be an integer value specifying the same
2232 # prefix length for all columns of the index
2233 columns_str = ", ".join(
2234 "%s(%d)" % (col, length) for col in columns
2235 )
2236 else:
2237 columns_str = ", ".join(columns)
2238 text += "(%s)" % columns_str
2239
2240 parser = index.dialect_options["mysql"]["with_parser"]
2241 if parser is not None:
2242 text += " WITH PARSER %s" % (parser,)
2243
2244 using = index.dialect_options["mysql"]["using"]
2245 if using is not None:
2246 text += " USING %s" % (preparer.quote(using))
2247
2248 return text
2249
2250 def visit_primary_key_constraint(
2251 self, constraint: sa_schema.PrimaryKeyConstraint, **kw: Any
2252 ) -> str:
2253 text = super().visit_primary_key_constraint(constraint)
2254 using = constraint.dialect_options["mysql"]["using"]
2255 if using:
2256 text += " USING %s" % (self.preparer.quote(using))
2257 return text
2258
2259 def visit_drop_index(self, drop: ddl.DropIndex, **kw: Any) -> str:
2260 index = drop.element
2261 text = "\nDROP INDEX "
2262 if drop.if_exists:
2263 text += "IF EXISTS "
2264
2265 return text + "%s ON %s" % (
2266 self._prepared_index_name(index, include_schema=False),
2267 self.preparer.format_table(index.table), # type: ignore[arg-type]
2268 )
2269
2270 def visit_drop_constraint(
2271 self, drop: ddl.DropConstraint, **kw: Any
2272 ) -> str:
2273 constraint = drop.element
2274 if isinstance(constraint, sa_schema.ForeignKeyConstraint):
2275 qual = "FOREIGN KEY "
2276 const = self.preparer.format_constraint(constraint)
2277 elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
2278 qual = "PRIMARY KEY "
2279 const = ""
2280 elif isinstance(constraint, sa_schema.UniqueConstraint):
2281 qual = "INDEX "
2282 const = self.preparer.format_constraint(constraint)
2283 elif isinstance(constraint, sa_schema.CheckConstraint):
2284 if self.dialect.is_mariadb:
2285 qual = "CONSTRAINT "
2286 else:
2287 qual = "CHECK "
2288 const = self.preparer.format_constraint(constraint)
2289 else:
2290 qual = ""
2291 const = self.preparer.format_constraint(constraint)
2292 return "ALTER TABLE %s DROP %s%s" % (
2293 self.preparer.format_table(constraint.table),
2294 qual,
2295 const,
2296 )
2297
2298 def define_constraint_match(
2299 self, constraint: sa_schema.ForeignKeyConstraint
2300 ) -> str:
2301 if constraint.match is not None:
2302 raise exc.CompileError(
2303 "MySQL ignores the 'MATCH' keyword while at the same time "
2304 "causes ON UPDATE/ON DELETE clauses to be ignored."
2305 )
2306 return ""
2307
2308 def visit_set_table_comment(
2309 self, create: ddl.SetTableComment, **kw: Any
2310 ) -> str:
2311 return "ALTER TABLE %s COMMENT %s" % (
2312 self.preparer.format_table(create.element),
2313 self.sql_compiler.render_literal_value(
2314 create.element.comment, sqltypes.String()
2315 ),
2316 )
2317
2318 def visit_drop_table_comment(
2319 self, drop: ddl.DropTableComment, **kw: Any
2320 ) -> str:
2321 return "ALTER TABLE %s COMMENT ''" % (
2322 self.preparer.format_table(drop.element)
2323 )
2324
2325 def visit_set_column_comment(
2326 self, create: ddl.SetColumnComment, **kw: Any
2327 ) -> str:
2328 return "ALTER TABLE %s CHANGE %s %s" % (
2329 self.preparer.format_table(create.element.table),
2330 self.preparer.format_column(create.element),
2331 self.get_column_specification(create.element),
2332 )
2333
2334 def get_identity_options(self, identity_options: IdentityOptions) -> str:
2335 """mariadb-specific sequence option; this will move to a
2336 mariadb-specific module in 2.1
2337
2338 """
2339 text = super().get_identity_options(identity_options)
2340 text = text.replace("NO CYCLE", "NOCYCLE")
2341 return text
2342
2343
2344class MySQLTypeCompiler(compiler.GenericTypeCompiler):
2345 def _extend_numeric(self, type_: _NumericType, spec: str) -> str:
2346 "Extend a numeric-type declaration with MySQL specific extensions."
2347
2348 if not self._mysql_type(type_):
2349 return spec
2350
2351 if type_.unsigned:
2352 spec += " UNSIGNED"
2353 if type_.zerofill:
2354 spec += " ZEROFILL"
2355 return spec
2356
2357 def _extend_string(
2358 self, type_: _StringType, defaults: Dict[str, Any], spec: str
2359 ) -> str:
2360 """Extend a string-type declaration with standard SQL CHARACTER SET /
2361 COLLATE annotations and MySQL specific extensions.
2362
2363 """
2364
2365 def attr(name: str) -> Any:
2366 return getattr(type_, name, defaults.get(name))
2367
2368 if attr("charset"):
2369 charset = "CHARACTER SET %s" % attr("charset")
2370 elif attr("ascii"):
2371 charset = "ASCII"
2372 elif attr("unicode"):
2373 charset = "UNICODE"
2374 else:
2375
2376 charset = None
2377
2378 if attr("collation"):
2379 collation = "COLLATE %s" % type_.collation
2380 elif attr("binary"):
2381 collation = "BINARY"
2382 else:
2383 collation = None
2384
2385 if attr("national"):
2386 # NATIONAL (aka NCHAR/NVARCHAR) trumps charsets.
2387 return " ".join(
2388 [c for c in ("NATIONAL", spec, collation) if c is not None]
2389 )
2390 return " ".join(
2391 [c for c in (spec, charset, collation) if c is not None]
2392 )
2393
2394 def _mysql_type(self, type_: Any) -> bool:
2395 return isinstance(type_, (_StringType, _NumericType))
2396
2397 def visit_NUMERIC(self, type_: NUMERIC, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2398 if type_.precision is None:
2399 return self._extend_numeric(type_, "NUMERIC")
2400 elif type_.scale is None:
2401 return self._extend_numeric(
2402 type_,
2403 "NUMERIC(%(precision)s)" % {"precision": type_.precision},
2404 )
2405 else:
2406 return self._extend_numeric(
2407 type_,
2408 "NUMERIC(%(precision)s, %(scale)s)"
2409 % {"precision": type_.precision, "scale": type_.scale},
2410 )
2411
2412 def visit_DECIMAL(self, type_: DECIMAL, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2413 if type_.precision is None:
2414 return self._extend_numeric(type_, "DECIMAL")
2415 elif type_.scale is None:
2416 return self._extend_numeric(
2417 type_,
2418 "DECIMAL(%(precision)s)" % {"precision": type_.precision},
2419 )
2420 else:
2421 return self._extend_numeric(
2422 type_,
2423 "DECIMAL(%(precision)s, %(scale)s)"
2424 % {"precision": type_.precision, "scale": type_.scale},
2425 )
2426
2427 def visit_DOUBLE(self, type_: DOUBLE, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2428 if type_.precision is not None and type_.scale is not None:
2429 return self._extend_numeric(
2430 type_,
2431 "DOUBLE(%(precision)s, %(scale)s)"
2432 % {"precision": type_.precision, "scale": type_.scale},
2433 )
2434 else:
2435 return self._extend_numeric(type_, "DOUBLE")
2436
2437 def visit_REAL(self, type_: REAL, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2438 if type_.precision is not None and type_.scale is not None:
2439 return self._extend_numeric(
2440 type_,
2441 "REAL(%(precision)s, %(scale)s)"
2442 % {"precision": type_.precision, "scale": type_.scale},
2443 )
2444 else:
2445 return self._extend_numeric(type_, "REAL")
2446
2447 def visit_FLOAT(self, type_: FLOAT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2448 if (
2449 self._mysql_type(type_)
2450 and type_.scale is not None
2451 and type_.precision is not None
2452 ):
2453 return self._extend_numeric(
2454 type_, "FLOAT(%s, %s)" % (type_.precision, type_.scale)
2455 )
2456 elif type_.precision is not None:
2457 return self._extend_numeric(
2458 type_, "FLOAT(%s)" % (type_.precision,)
2459 )
2460 else:
2461 return self._extend_numeric(type_, "FLOAT")
2462
2463 def visit_INTEGER(self, type_: INTEGER, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2464 if self._mysql_type(type_) and type_.display_width is not None:
2465 return self._extend_numeric(
2466 type_,
2467 "INTEGER(%(display_width)s)"
2468 % {"display_width": type_.display_width},
2469 )
2470 else:
2471 return self._extend_numeric(type_, "INTEGER")
2472
2473 def visit_BIGINT(self, type_: BIGINT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2474 if self._mysql_type(type_) and type_.display_width is not None:
2475 return self._extend_numeric(
2476 type_,
2477 "BIGINT(%(display_width)s)"
2478 % {"display_width": type_.display_width},
2479 )
2480 else:
2481 return self._extend_numeric(type_, "BIGINT")
2482
2483 def visit_MEDIUMINT(self, type_: MEDIUMINT, **kw: Any) -> str:
2484 if self._mysql_type(type_) and type_.display_width is not None:
2485 return self._extend_numeric(
2486 type_,
2487 "MEDIUMINT(%(display_width)s)"
2488 % {"display_width": type_.display_width},
2489 )
2490 else:
2491 return self._extend_numeric(type_, "MEDIUMINT")
2492
2493 def visit_TINYINT(self, type_: TINYINT, **kw: Any) -> str:
2494 if self._mysql_type(type_) and type_.display_width is not None:
2495 return self._extend_numeric(
2496 type_, "TINYINT(%s)" % type_.display_width
2497 )
2498 else:
2499 return self._extend_numeric(type_, "TINYINT")
2500
2501 def visit_SMALLINT(self, type_: SMALLINT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2502 if self._mysql_type(type_) and type_.display_width is not None:
2503 return self._extend_numeric(
2504 type_,
2505 "SMALLINT(%(display_width)s)"
2506 % {"display_width": type_.display_width},
2507 )
2508 else:
2509 return self._extend_numeric(type_, "SMALLINT")
2510
2511 def visit_BIT(self, type_: BIT, **kw: Any) -> str:
2512 if type_.length is not None:
2513 return "BIT(%s)" % type_.length
2514 else:
2515 return "BIT"
2516
2517 def visit_DATETIME(self, type_: DATETIME, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2518 if getattr(type_, "fsp", None):
2519 return "DATETIME(%d)" % type_.fsp # type: ignore[str-format]
2520 else:
2521 return "DATETIME"
2522
2523 def visit_DATE(self, type_: DATE, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2524 return "DATE"
2525
2526 def visit_TIME(self, type_: TIME, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2527 if getattr(type_, "fsp", None):
2528 return "TIME(%d)" % type_.fsp # type: ignore[str-format]
2529 else:
2530 return "TIME"
2531
2532 def visit_TIMESTAMP(self, type_: TIMESTAMP, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2533 if getattr(type_, "fsp", None):
2534 return "TIMESTAMP(%d)" % type_.fsp # type: ignore[str-format]
2535 else:
2536 return "TIMESTAMP"
2537
2538 def visit_YEAR(self, type_: YEAR, **kw: Any) -> str:
2539 if type_.display_width is None:
2540 return "YEAR"
2541 else:
2542 return "YEAR(%s)" % type_.display_width
2543
2544 def visit_TEXT(self, type_: TEXT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2545 if type_.length is not None:
2546 return self._extend_string(type_, {}, "TEXT(%d)" % type_.length)
2547 else:
2548 return self._extend_string(type_, {}, "TEXT")
2549
2550 def visit_TINYTEXT(self, type_: TINYTEXT, **kw: Any) -> str:
2551 return self._extend_string(type_, {}, "TINYTEXT")
2552
2553 def visit_MEDIUMTEXT(self, type_: MEDIUMTEXT, **kw: Any) -> str:
2554 return self._extend_string(type_, {}, "MEDIUMTEXT")
2555
2556 def visit_LONGTEXT(self, type_: LONGTEXT, **kw: Any) -> str:
2557 return self._extend_string(type_, {}, "LONGTEXT")
2558
2559 def visit_VARCHAR(self, type_: VARCHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2560 if type_.length is not None:
2561 return self._extend_string(type_, {}, "VARCHAR(%d)" % type_.length)
2562 else:
2563 raise exc.CompileError(
2564 "VARCHAR requires a length on dialect %s" % self.dialect.name
2565 )
2566
2567 def visit_CHAR(self, type_: CHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2568 if type_.length is not None:
2569 return self._extend_string(
2570 type_, {}, "CHAR(%(length)s)" % {"length": type_.length}
2571 )
2572 else:
2573 return self._extend_string(type_, {}, "CHAR")
2574
2575 def visit_NVARCHAR(self, type_: NVARCHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2576 # We'll actually generate the equiv. "NATIONAL VARCHAR" instead
2577 # of "NVARCHAR".
2578 if type_.length is not None:
2579 return self._extend_string(
2580 type_,
2581 {"national": True},
2582 "VARCHAR(%(length)s)" % {"length": type_.length},
2583 )
2584 else:
2585 raise exc.CompileError(
2586 "NVARCHAR requires a length on dialect %s" % self.dialect.name
2587 )
2588
2589 def visit_NCHAR(self, type_: NCHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2590 # We'll actually generate the equiv.
2591 # "NATIONAL CHAR" instead of "NCHAR".
2592 if type_.length is not None:
2593 return self._extend_string(
2594 type_,
2595 {"national": True},
2596 "CHAR(%(length)s)" % {"length": type_.length},
2597 )
2598 else:
2599 return self._extend_string(type_, {"national": True}, "CHAR")
2600
2601 def visit_UUID(self, type_: UUID[Any], **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2602 return "UUID"
2603
2604 def visit_VARBINARY(self, type_: VARBINARY, **kw: Any) -> str:
2605 return "VARBINARY(%d)" % type_.length # type: ignore[str-format]
2606
2607 def visit_JSON(self, type_: JSON, **kw: Any) -> str:
2608 return "JSON"
2609
2610 def visit_large_binary(self, type_: LargeBinary, **kw: Any) -> str:
2611 return self.visit_BLOB(type_)
2612
2613 def visit_enum(self, type_: ENUM, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2614 if not type_.native_enum:
2615 return super().visit_enum(type_)
2616 else:
2617 return self._visit_enumerated_values("ENUM", type_, type_.enums)
2618
2619 def visit_BLOB(self, type_: LargeBinary, **kw: Any) -> str:
2620 if type_.length is not None:
2621 return "BLOB(%d)" % type_.length
2622 else:
2623 return "BLOB"
2624
2625 def visit_TINYBLOB(self, type_: TINYBLOB, **kw: Any) -> str:
2626 return "TINYBLOB"
2627
2628 def visit_MEDIUMBLOB(self, type_: MEDIUMBLOB, **kw: Any) -> str:
2629 return "MEDIUMBLOB"
2630
2631 def visit_LONGBLOB(self, type_: LONGBLOB, **kw: Any) -> str:
2632 return "LONGBLOB"
2633
2634 def _visit_enumerated_values(
2635 self, name: str, type_: _StringType, enumerated_values: Sequence[str]
2636 ) -> str:
2637 quoted_enums = []
2638 for e in enumerated_values:
2639 if self.dialect.identifier_preparer._double_percents:
2640 e = e.replace("%", "%%")
2641 quoted_enums.append("'%s'" % e.replace("'", "''"))
2642 return self._extend_string(
2643 type_, {}, "%s(%s)" % (name, ",".join(quoted_enums))
2644 )
2645
2646 def visit_ENUM(self, type_: ENUM, **kw: Any) -> str:
2647 return self._visit_enumerated_values("ENUM", type_, type_.enums)
2648
2649 def visit_SET(self, type_: SET, **kw: Any) -> str:
2650 return self._visit_enumerated_values("SET", type_, type_.values)
2651
2652 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
2653 return "BOOL"
2654
2655
2656class MySQLIdentifierPreparer(compiler.IdentifierPreparer):
2657 reserved_words = RESERVED_WORDS_MYSQL
2658
2659 def __init__(
2660 self,
2661 dialect: default.DefaultDialect,
2662 server_ansiquotes: bool = False,
2663 **kw: Any,
2664 ):
2665 if not server_ansiquotes:
2666 quote = "`"
2667 else:
2668 quote = '"'
2669
2670 super().__init__(dialect, initial_quote=quote, escape_quote=quote)
2671
2672 def _quote_free_identifiers(self, *ids: Optional[str]) -> Tuple[str, ...]:
2673 """Unilaterally identifier-quote any number of strings."""
2674
2675 return tuple([self.quote_identifier(i) for i in ids if i is not None])
2676
2677
2678class MariaDBIdentifierPreparer(MySQLIdentifierPreparer):
2679 reserved_words = RESERVED_WORDS_MARIADB
2680
2681
2682class MySQLDialect(default.DefaultDialect):
2683 """Details of the MySQL dialect.
2684 Not used directly in application code.
2685 """
2686
2687 name = "mysql"
2688 supports_statement_cache = True
2689
2690 supports_alter = True
2691
2692 # MySQL has no true "boolean" type; we
2693 # allow for the "true" and "false" keywords, however
2694 supports_native_boolean = False
2695
2696 # support for BIT type; mysqlconnector coerces result values automatically,
2697 # all other MySQL DBAPIs require a conversion routine
2698 supports_native_bit = False
2699
2700 # identifiers are 64, however aliases can be 255...
2701 max_identifier_length = 255
2702 max_index_name_length = 64
2703 max_constraint_name_length = 64
2704
2705 div_is_floordiv = False
2706
2707 supports_native_enum = True
2708
2709 returns_native_bytes = True
2710
2711 # ... may be updated to True for MariaDB 10.3+ in initialize()
2712 supports_sequences = False
2713
2714 sequences_optional = False
2715
2716 # ... may be updated to True for MySQL 8+ in initialize()
2717 supports_for_update_of = False
2718
2719 # mysql 8.0.1 uses this syntax
2720 use_mysql_for_share = False
2721
2722 # Only available ... ... in MySQL 8+
2723 _requires_alias_for_on_duplicate_key = False
2724
2725 # MySQL doesn't support "DEFAULT VALUES" but *does* support
2726 # "VALUES (DEFAULT)"
2727 supports_default_values = False
2728 supports_default_metavalue = True
2729
2730 use_insertmanyvalues: bool = True
2731 insertmanyvalues_implicit_sentinel = (
2732 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
2733 )
2734
2735 supports_sane_rowcount = True
2736 supports_sane_multi_rowcount = False
2737 supports_multivalues_insert = True
2738 insert_null_pk_still_autoincrements = True
2739
2740 supports_comments = True
2741 inline_comments = True
2742 default_paramstyle = "format"
2743 colspecs = colspecs
2744
2745 cte_follows_insert = True
2746
2747 statement_compiler = MySQLCompiler
2748 ddl_compiler = MySQLDDLCompiler
2749 type_compiler_cls = MySQLTypeCompiler
2750 ischema_names = ischema_names
2751 preparer: type[MySQLIdentifierPreparer] = MySQLIdentifierPreparer
2752
2753 is_mariadb: bool = False
2754 _mariadb_normalized_version_info = None
2755
2756 # default SQL compilation settings -
2757 # these are modified upon initialize(),
2758 # i.e. first connect
2759 _backslash_escapes = True
2760 _server_ansiquotes = False
2761
2762 server_version_info: Tuple[int, ...]
2763 identifier_preparer: MySQLIdentifierPreparer
2764
2765 construct_arguments = [
2766 (sa_schema.Table, {"*": None}),
2767 (sql.Update, {"limit": None}),
2768 (sql.Delete, {"limit": None}),
2769 (sa_schema.PrimaryKeyConstraint, {"using": None}),
2770 (
2771 sa_schema.Index,
2772 {
2773 "using": None,
2774 "length": None,
2775 "prefix": None,
2776 "with_parser": None,
2777 },
2778 ),
2779 ]
2780
2781 def __init__(
2782 self,
2783 json_serializer: Optional[Callable[..., Any]] = None,
2784 json_deserializer: Optional[Callable[..., Any]] = None,
2785 is_mariadb: Optional[bool] = None,
2786 **kwargs: Any,
2787 ) -> None:
2788 kwargs.pop("use_ansiquotes", None) # legacy
2789 default.DefaultDialect.__init__(self, **kwargs)
2790 self._json_serializer = json_serializer
2791 self._json_deserializer = json_deserializer
2792 self._set_mariadb(is_mariadb, ())
2793
2794 def get_isolation_level_values(
2795 self, dbapi_conn: DBAPIConnection
2796 ) -> Sequence[IsolationLevel]:
2797 return (
2798 "SERIALIZABLE",
2799 "READ UNCOMMITTED",
2800 "READ COMMITTED",
2801 "REPEATABLE READ",
2802 )
2803
2804 def set_isolation_level(
2805 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
2806 ) -> None:
2807 cursor = dbapi_connection.cursor()
2808 cursor.execute(f"SET SESSION TRANSACTION ISOLATION LEVEL {level}")
2809 cursor.execute("COMMIT")
2810 cursor.close()
2811
2812 def get_isolation_level(
2813 self, dbapi_connection: DBAPIConnection
2814 ) -> IsolationLevel:
2815 cursor = dbapi_connection.cursor()
2816 if self._is_mysql and self.server_version_info >= (5, 7, 20):
2817 cursor.execute("SELECT @@transaction_isolation")
2818 else:
2819 cursor.execute("SELECT @@tx_isolation")
2820 row = cursor.fetchone()
2821 if row is None:
2822 util.warn(
2823 "Could not retrieve transaction isolation level for MySQL "
2824 "connection."
2825 )
2826 raise NotImplementedError()
2827 val = row[0]
2828 cursor.close()
2829 if isinstance(val, bytes):
2830 val = val.decode()
2831 return val.upper().replace("-", " ") # type: ignore[no-any-return]
2832
2833 @classmethod
2834 def _is_mariadb_from_url(cls, url: URL) -> bool:
2835 dbapi = cls.import_dbapi()
2836 dialect = cls(dbapi=dbapi)
2837
2838 cargs, cparams = dialect.create_connect_args(url)
2839 conn = dialect.connect(*cargs, **cparams)
2840 try:
2841 cursor = conn.cursor()
2842 cursor.execute("SELECT VERSION() LIKE '%MariaDB%'")
2843 val = cursor.fetchone()[0] # type: ignore[index]
2844 except:
2845 raise
2846 else:
2847 return bool(val)
2848 finally:
2849 conn.close()
2850
2851 def _get_server_version_info(
2852 self, connection: Connection
2853 ) -> Tuple[int, ...]:
2854 # get database server version info explicitly over the wire
2855 # to avoid proxy servers like MaxScale getting in the
2856 # way with their own values, see #4205
2857 dbapi_con = connection.connection
2858 cursor = dbapi_con.cursor()
2859 cursor.execute("SELECT VERSION()")
2860
2861 val = cursor.fetchone()[0] # type: ignore[index]
2862 cursor.close()
2863 if isinstance(val, bytes):
2864 val = val.decode()
2865
2866 return self._parse_server_version(val)
2867
2868 def _parse_server_version(self, val: str) -> Tuple[int, ...]:
2869 version: List[int] = []
2870 is_mariadb = False
2871
2872 r = re.compile(r"[.\-+]")
2873 tokens = r.split(val)
2874 for token in tokens:
2875 parsed_token = re.match(
2876 r"^(?:(\d+)(?:a|b|c)?|(MariaDB\w*))$", token
2877 )
2878 if not parsed_token:
2879 continue
2880 elif parsed_token.group(2):
2881 self._mariadb_normalized_version_info = tuple(version[-3:])
2882 is_mariadb = True
2883 else:
2884 digit = int(parsed_token.group(1))
2885 version.append(digit)
2886
2887 server_version_info = tuple(version)
2888
2889 self._set_mariadb(
2890 bool(server_version_info and is_mariadb), server_version_info
2891 )
2892
2893 if not is_mariadb:
2894 self._mariadb_normalized_version_info = server_version_info
2895
2896 if server_version_info < (5, 0, 2):
2897 raise NotImplementedError(
2898 "the MySQL/MariaDB dialect supports server "
2899 "version info 5.0.2 and above."
2900 )
2901
2902 # setting it here to help w the test suite
2903 self.server_version_info = server_version_info
2904 return server_version_info
2905
2906 def _set_mariadb(
2907 self, is_mariadb: Optional[bool], server_version_info: Tuple[int, ...]
2908 ) -> None:
2909 if is_mariadb is None:
2910 return
2911
2912 if not is_mariadb and self.is_mariadb:
2913 raise exc.InvalidRequestError(
2914 "MySQL version %s is not a MariaDB variant."
2915 % (".".join(map(str, server_version_info)),)
2916 )
2917 if is_mariadb:
2918
2919 if not issubclass(self.preparer, MariaDBIdentifierPreparer):
2920 self.preparer = MariaDBIdentifierPreparer
2921 # this would have been set by the default dialect already,
2922 # so set it again
2923 self.identifier_preparer = self.preparer(self)
2924
2925 # this will be updated on first connect in initialize()
2926 # if using older mariadb version
2927 self.delete_returning = True
2928 self.insert_returning = True
2929
2930 self.is_mariadb = is_mariadb
2931
2932 def do_begin_twophase(self, connection: Connection, xid: Any) -> None:
2933 connection.execute(sql.text("XA BEGIN :xid"), dict(xid=xid))
2934
2935 def do_prepare_twophase(self, connection: Connection, xid: Any) -> None:
2936 connection.execute(sql.text("XA END :xid"), dict(xid=xid))
2937 connection.execute(sql.text("XA PREPARE :xid"), dict(xid=xid))
2938
2939 def do_rollback_twophase(
2940 self,
2941 connection: Connection,
2942 xid: Any,
2943 is_prepared: bool = True,
2944 recover: bool = False,
2945 ) -> None:
2946 if not is_prepared:
2947 connection.execute(sql.text("XA END :xid"), dict(xid=xid))
2948 connection.execute(sql.text("XA ROLLBACK :xid"), dict(xid=xid))
2949
2950 def do_commit_twophase(
2951 self,
2952 connection: Connection,
2953 xid: Any,
2954 is_prepared: bool = True,
2955 recover: bool = False,
2956 ) -> None:
2957 if not is_prepared:
2958 self.do_prepare_twophase(connection, xid)
2959 connection.execute(sql.text("XA COMMIT :xid"), dict(xid=xid))
2960
2961 def do_recover_twophase(self, connection: Connection) -> List[Any]:
2962 resultset = connection.exec_driver_sql("XA RECOVER")
2963 return [
2964 row["data"][0 : row["gtrid_length"]]
2965 for row in resultset.mappings()
2966 ]
2967
2968 def is_disconnect(
2969 self,
2970 e: DBAPIModule.Error,
2971 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
2972 cursor: Optional[DBAPICursor],
2973 ) -> bool:
2974 if isinstance(
2975 e,
2976 (
2977 self.dbapi.OperationalError, # type: ignore
2978 self.dbapi.ProgrammingError, # type: ignore
2979 self.dbapi.InterfaceError, # type: ignore
2980 ),
2981 ) and self._extract_error_code(e) in (
2982 1927,
2983 2006,
2984 2013,
2985 2014,
2986 2045,
2987 2055,
2988 4031,
2989 ):
2990 return True
2991 elif isinstance(
2992 e, (self.dbapi.InterfaceError, self.dbapi.InternalError) # type: ignore # noqa: E501
2993 ):
2994 # if underlying connection is closed,
2995 # this is the error you get
2996 return "(0, '')" in str(e)
2997 else:
2998 return False
2999
3000 def _compat_fetchall(
3001 self, rp: CursorResult[Any], charset: Optional[str] = None
3002 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]:
3003 """Proxy result rows to smooth over MySQL-Python driver
3004 inconsistencies."""
3005
3006 return [_DecodingRow(row, charset) for row in rp.fetchall()]
3007
3008 def _compat_fetchone(
3009 self, rp: CursorResult[Any], charset: Optional[str] = None
3010 ) -> Union[Row[Any], None, _DecodingRow]:
3011 """Proxy a result row to smooth over MySQL-Python driver
3012 inconsistencies."""
3013
3014 row = rp.fetchone()
3015 if row:
3016 return _DecodingRow(row, charset)
3017 else:
3018 return None
3019
3020 def _compat_first(
3021 self, rp: CursorResult[Any], charset: Optional[str] = None
3022 ) -> Optional[_DecodingRow]:
3023 """Proxy a result row to smooth over MySQL-Python driver
3024 inconsistencies."""
3025
3026 row = rp.first()
3027 if row:
3028 return _DecodingRow(row, charset)
3029 else:
3030 return None
3031
3032 def _extract_error_code(
3033 self, exception: DBAPIModule.Error
3034 ) -> Optional[int]:
3035 raise NotImplementedError()
3036
3037 def _get_default_schema_name(self, connection: Connection) -> str:
3038 return connection.exec_driver_sql("SELECT DATABASE()").scalar() # type: ignore[return-value] # noqa: E501
3039
3040 @reflection.cache
3041 def has_table(
3042 self,
3043 connection: Connection,
3044 table_name: str,
3045 schema: Optional[str] = None,
3046 **kw: Any,
3047 ) -> bool:
3048 self._ensure_has_table_connection(connection)
3049
3050 if schema is None:
3051 schema = self.default_schema_name
3052
3053 assert schema is not None
3054
3055 full_name = ".".join(
3056 self.identifier_preparer._quote_free_identifiers(
3057 schema, table_name
3058 )
3059 )
3060
3061 # DESCRIBE *must* be used because there is no information schema
3062 # table that returns information on temp tables that is consistently
3063 # available on MariaDB / MySQL / engine-agnostic etc.
3064 # therefore we have no choice but to use DESCRIBE and an error catch
3065 # to detect "False". See issue #9058
3066
3067 try:
3068 with connection.exec_driver_sql(
3069 f"DESCRIBE {full_name}",
3070 execution_options={"skip_user_error_events": True},
3071 ) as rs:
3072 return rs.fetchone() is not None
3073 except exc.DBAPIError as e:
3074 # https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html # noqa: E501
3075 # there are a lot of codes that *may* pop up here at some point
3076 # but we continue to be fairly conservative. We include:
3077 # 1146: Table '%s.%s' doesn't exist - what every MySQL has emitted
3078 # for decades
3079 #
3080 # mysql 8 suddenly started emitting:
3081 # 1049: Unknown database '%s' - for nonexistent schema
3082 #
3083 # also added:
3084 # 1051: Unknown table '%s' - not known to emit
3085 #
3086 # there's more "doesn't exist" kinds of messages but they are
3087 # less clear if mysql 8 would suddenly start using one of those
3088 if self._extract_error_code(e.orig) in (1146, 1049, 1051): # type: ignore # noqa: E501
3089 return False
3090 raise
3091
3092 @reflection.cache
3093 def has_sequence(
3094 self,
3095 connection: Connection,
3096 sequence_name: str,
3097 schema: Optional[str] = None,
3098 **kw: Any,
3099 ) -> bool:
3100 if not self.supports_sequences:
3101 self._sequences_not_supported()
3102 if not schema:
3103 schema = self.default_schema_name
3104 # MariaDB implements sequences as a special type of table
3105 #
3106 cursor = connection.execute(
3107 sql.text(
3108 "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
3109 "WHERE TABLE_TYPE='SEQUENCE' and TABLE_NAME=:name AND "
3110 "TABLE_SCHEMA=:schema_name"
3111 ),
3112 dict(
3113 name=str(sequence_name),
3114 schema_name=str(schema),
3115 ),
3116 )
3117 return cursor.first() is not None
3118
3119 def _sequences_not_supported(self) -> NoReturn:
3120 raise NotImplementedError(
3121 "Sequences are supported only by the "
3122 "MariaDB series 10.3 or greater"
3123 )
3124
3125 @reflection.cache
3126 def get_sequence_names(
3127 self, connection: Connection, schema: Optional[str] = None, **kw: Any
3128 ) -> List[str]:
3129 if not self.supports_sequences:
3130 self._sequences_not_supported()
3131 if not schema:
3132 schema = self.default_schema_name
3133 # MariaDB implements sequences as a special type of table
3134 cursor = connection.execute(
3135 sql.text(
3136 "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
3137 "WHERE TABLE_TYPE='SEQUENCE' and TABLE_SCHEMA=:schema_name"
3138 ),
3139 dict(schema_name=schema),
3140 )
3141 return [
3142 row[0]
3143 for row in self._compat_fetchall(
3144 cursor, charset=self._connection_charset
3145 )
3146 ]
3147
3148 def initialize(self, connection: Connection) -> None:
3149 # this is driver-based, does not need server version info
3150 # and is fairly critical for even basic SQL operations
3151 self._connection_charset: Optional[str] = self._detect_charset(
3152 connection
3153 )
3154
3155 # call super().initialize() because we need to have
3156 # server_version_info set up. in 1.4 under python 2 only this does the
3157 # "check unicode returns" thing, which is the one area that some
3158 # SQL gets compiled within initialize() currently
3159 default.DefaultDialect.initialize(self, connection)
3160
3161 self._detect_sql_mode(connection)
3162 self._detect_ansiquotes(connection) # depends on sql mode
3163 self._detect_casing(connection)
3164 if self._server_ansiquotes:
3165 # if ansiquotes == True, build a new IdentifierPreparer
3166 # with the new setting
3167 self.identifier_preparer = self.preparer(
3168 self, server_ansiquotes=self._server_ansiquotes
3169 )
3170
3171 self.supports_sequences = (
3172 self.is_mariadb and self.server_version_info >= (10, 3)
3173 )
3174
3175 self.supports_for_update_of = (
3176 self._is_mysql and self.server_version_info >= (8,)
3177 )
3178
3179 self.use_mysql_for_share = (
3180 self._is_mysql and self.server_version_info >= (8, 0, 1)
3181 )
3182
3183 self._needs_correct_for_88718_96365 = (
3184 not self.is_mariadb and self.server_version_info >= (8,)
3185 )
3186
3187 self.delete_returning = (
3188 self.is_mariadb and self.server_version_info >= (10, 0, 5)
3189 )
3190
3191 self.insert_returning = (
3192 self.is_mariadb and self.server_version_info >= (10, 5)
3193 )
3194
3195 self._requires_alias_for_on_duplicate_key = (
3196 self._is_mysql and self.server_version_info >= (8, 0, 20)
3197 )
3198
3199 self._warn_for_known_db_issues()
3200
3201 def _warn_for_known_db_issues(self) -> None:
3202 if self.is_mariadb:
3203 mdb_version = self._mariadb_normalized_version_info
3204 assert mdb_version is not None
3205 if mdb_version > (10, 2) and mdb_version < (10, 2, 9):
3206 util.warn(
3207 "MariaDB %r before 10.2.9 has known issues regarding "
3208 "CHECK constraints, which impact handling of NULL values "
3209 "with SQLAlchemy's boolean datatype (MDEV-13596). An "
3210 "additional issue prevents proper migrations of columns "
3211 "with CHECK constraints (MDEV-11114). Please upgrade to "
3212 "MariaDB 10.2.9 or greater, or use the MariaDB 10.1 "
3213 "series, to avoid these issues." % (mdb_version,)
3214 )
3215
3216 @property
3217 def _support_float_cast(self) -> bool:
3218 if not self.server_version_info:
3219 return False
3220 elif self.is_mariadb:
3221 # ref https://mariadb.com/kb/en/mariadb-1045-release-notes/
3222 return self.server_version_info >= (10, 4, 5)
3223 else:
3224 # ref https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-17.html#mysqld-8-0-17-feature # noqa
3225 return self.server_version_info >= (8, 0, 17)
3226
3227 @property
3228 def _support_default_function(self) -> bool:
3229 if not self.server_version_info:
3230 return False
3231 elif self.is_mariadb:
3232 # ref https://mariadb.com/kb/en/mariadb-1021-release-notes/
3233 return self.server_version_info >= (10, 2, 1)
3234 else:
3235 # ref https://dev.mysql.com/doc/refman/8.0/en/data-type-defaults.html # noqa
3236 return self.server_version_info >= (8, 0, 13)
3237
3238 @property
3239 def _is_mariadb(self) -> bool:
3240 return self.is_mariadb
3241
3242 @property
3243 def _is_mysql(self) -> bool:
3244 return not self.is_mariadb
3245
3246 @property
3247 def _is_mariadb_102(self) -> bool:
3248 return (
3249 self.is_mariadb
3250 and self._mariadb_normalized_version_info # type:ignore[operator]
3251 > (
3252 10,
3253 2,
3254 )
3255 )
3256
3257 @reflection.cache
3258 def get_schema_names(self, connection: Connection, **kw: Any) -> List[str]:
3259 rp = connection.exec_driver_sql("SHOW schemas")
3260 return [r[0] for r in rp]
3261
3262 @reflection.cache
3263 def get_table_names(
3264 self, connection: Connection, schema: Optional[str] = None, **kw: Any
3265 ) -> List[str]:
3266 """Return a Unicode SHOW TABLES from a given schema."""
3267 if schema is not None:
3268 current_schema: str = schema
3269 else:
3270 current_schema = self.default_schema_name # type: ignore
3271
3272 charset = self._connection_charset
3273
3274 rp = connection.exec_driver_sql(
3275 "SHOW FULL TABLES FROM %s"
3276 % self.identifier_preparer.quote_identifier(current_schema)
3277 )
3278
3279 return [
3280 row[0]
3281 for row in self._compat_fetchall(rp, charset=charset)
3282 if row[1] == "BASE TABLE"
3283 ]
3284
3285 @reflection.cache
3286 def get_view_names(
3287 self, connection: Connection, schema: Optional[str] = None, **kw: Any
3288 ) -> List[str]:
3289 if schema is None:
3290 schema = self.default_schema_name
3291 assert schema is not None
3292 charset = self._connection_charset
3293 rp = connection.exec_driver_sql(
3294 "SHOW FULL TABLES FROM %s"
3295 % self.identifier_preparer.quote_identifier(schema)
3296 )
3297 return [
3298 row[0]
3299 for row in self._compat_fetchall(rp, charset=charset)
3300 if row[1] in ("VIEW", "SYSTEM VIEW")
3301 ]
3302
3303 @reflection.cache
3304 def get_table_options(
3305 self,
3306 connection: Connection,
3307 table_name: str,
3308 schema: Optional[str] = None,
3309 **kw: Any,
3310 ) -> Dict[str, Any]:
3311 parsed_state = self._parsed_state_or_create(
3312 connection, table_name, schema, **kw
3313 )
3314 if parsed_state.table_options:
3315 return parsed_state.table_options
3316 else:
3317 return ReflectionDefaults.table_options()
3318
3319 @reflection.cache
3320 def get_columns(
3321 self,
3322 connection: Connection,
3323 table_name: str,
3324 schema: Optional[str] = None,
3325 **kw: Any,
3326 ) -> List[ReflectedColumn]:
3327 parsed_state = self._parsed_state_or_create(
3328 connection, table_name, schema, **kw
3329 )
3330 if parsed_state.columns:
3331 return parsed_state.columns
3332 else:
3333 return ReflectionDefaults.columns()
3334
3335 @reflection.cache
3336 def get_pk_constraint(
3337 self,
3338 connection: Connection,
3339 table_name: str,
3340 schema: Optional[str] = None,
3341 **kw: Any,
3342 ) -> ReflectedPrimaryKeyConstraint:
3343 parsed_state = self._parsed_state_or_create(
3344 connection, table_name, schema, **kw
3345 )
3346 for key in parsed_state.keys:
3347 if key["type"] == "PRIMARY":
3348 # There can be only one.
3349 cols = [s[0] for s in key["columns"]]
3350 return {"constrained_columns": cols, "name": None}
3351 return ReflectionDefaults.pk_constraint()
3352
3353 @reflection.cache
3354 def get_foreign_keys(
3355 self,
3356 connection: Connection,
3357 table_name: str,
3358 schema: Optional[str] = None,
3359 **kw: Any,
3360 ) -> List[ReflectedForeignKeyConstraint]:
3361 parsed_state = self._parsed_state_or_create(
3362 connection, table_name, schema, **kw
3363 )
3364 default_schema = None
3365
3366 fkeys: List[ReflectedForeignKeyConstraint] = []
3367
3368 for spec in parsed_state.fk_constraints:
3369 ref_name = spec["table"][-1]
3370 ref_schema = len(spec["table"]) > 1 and spec["table"][-2] or schema
3371
3372 if not ref_schema:
3373 if default_schema is None:
3374 default_schema = connection.dialect.default_schema_name
3375 if schema == default_schema:
3376 ref_schema = schema
3377
3378 loc_names = spec["local"]
3379 ref_names = spec["foreign"]
3380
3381 con_kw = {}
3382 for opt in ("onupdate", "ondelete"):
3383 if spec.get(opt, False) not in ("NO ACTION", None):
3384 con_kw[opt] = spec[opt]
3385
3386 fkey_d: ReflectedForeignKeyConstraint = {
3387 "name": spec["name"],
3388 "constrained_columns": loc_names,
3389 "referred_schema": ref_schema,
3390 "referred_table": ref_name,
3391 "referred_columns": ref_names,
3392 "options": con_kw,
3393 }
3394 fkeys.append(fkey_d)
3395
3396 if self._needs_correct_for_88718_96365:
3397 self._correct_for_mysql_bugs_88718_96365(fkeys, connection)
3398
3399 return fkeys if fkeys else ReflectionDefaults.foreign_keys()
3400
3401 def _correct_for_mysql_bugs_88718_96365(
3402 self,
3403 fkeys: List[ReflectedForeignKeyConstraint],
3404 connection: Connection,
3405 ) -> None:
3406 # Foreign key is always in lower case (MySQL 8.0)
3407 # https://bugs.mysql.com/bug.php?id=88718
3408 # issue #4344 for SQLAlchemy
3409
3410 # table name also for MySQL 8.0
3411 # https://bugs.mysql.com/bug.php?id=96365
3412 # issue #4751 for SQLAlchemy
3413
3414 # for lower_case_table_names=2, information_schema.columns
3415 # preserves the original table/schema casing, but SHOW CREATE
3416 # TABLE does not. this problem is not in lower_case_table_names=1,
3417 # but use case-insensitive matching for these two modes in any case.
3418
3419 if self._casing in (1, 2):
3420
3421 def lower(s: str) -> str:
3422 return s.lower()
3423
3424 else:
3425 # if on case sensitive, there can be two tables referenced
3426 # with the same name different casing, so we need to use
3427 # case-sensitive matching.
3428 def lower(s: str) -> str:
3429 return s
3430
3431 default_schema_name: str = connection.dialect.default_schema_name # type: ignore # noqa: E501
3432
3433 # NOTE: using (table_schema, table_name, lower(column_name)) in (...)
3434 # is very slow since mysql does not seem able to properly use indexse.
3435 # Unpack the where condition instead.
3436 schema_by_table_by_column: DefaultDict[
3437 str, DefaultDict[str, List[str]]
3438 ] = DefaultDict(lambda: DefaultDict(list))
3439 for rec in fkeys:
3440 sch = lower(rec["referred_schema"] or default_schema_name)
3441 tbl = lower(rec["referred_table"])
3442 for col_name in rec["referred_columns"]:
3443 schema_by_table_by_column[sch][tbl].append(col_name)
3444
3445 if schema_by_table_by_column:
3446
3447 condition = sql.or_(
3448 *(
3449 sql.and_(
3450 _info_columns.c.table_schema == schema,
3451 sql.or_(
3452 *(
3453 sql.and_(
3454 _info_columns.c.table_name == table,
3455 sql.func.lower(
3456 _info_columns.c.column_name
3457 ).in_(columns),
3458 )
3459 for table, columns in tables.items()
3460 )
3461 ),
3462 )
3463 for schema, tables in schema_by_table_by_column.items()
3464 )
3465 )
3466
3467 select = sql.select(
3468 _info_columns.c.table_schema,
3469 _info_columns.c.table_name,
3470 _info_columns.c.column_name,
3471 ).where(condition)
3472
3473 correct_for_wrong_fk_case: CursorResult[Tuple[str, str, str]] = (
3474 connection.execute(select)
3475 )
3476
3477 # in casing=0, table name and schema name come back in their
3478 # exact case.
3479 # in casing=1, table name and schema name come back in lower
3480 # case.
3481 # in casing=2, table name and schema name come back from the
3482 # information_schema.columns view in the case
3483 # that was used in CREATE DATABASE and CREATE TABLE, but
3484 # SHOW CREATE TABLE converts them to *lower case*, therefore
3485 # not matching. So for this case, case-insensitive lookup
3486 # is necessary
3487 d: DefaultDict[Tuple[str, str], Dict[str, str]] = defaultdict(dict)
3488 for schema, tname, cname in correct_for_wrong_fk_case:
3489 d[(lower(schema), lower(tname))]["SCHEMANAME"] = schema
3490 d[(lower(schema), lower(tname))]["TABLENAME"] = tname
3491 d[(lower(schema), lower(tname))][cname.lower()] = cname
3492
3493 for fkey in fkeys:
3494 rec_b = d[
3495 (
3496 lower(fkey["referred_schema"] or default_schema_name),
3497 lower(fkey["referred_table"]),
3498 )
3499 ]
3500
3501 fkey["referred_table"] = rec_b["TABLENAME"]
3502 if fkey["referred_schema"] is not None:
3503 fkey["referred_schema"] = rec_b["SCHEMANAME"]
3504
3505 fkey["referred_columns"] = [
3506 rec_b[col.lower()] for col in fkey["referred_columns"]
3507 ]
3508
3509 @reflection.cache
3510 def get_check_constraints(
3511 self,
3512 connection: Connection,
3513 table_name: str,
3514 schema: Optional[str] = None,
3515 **kw: Any,
3516 ) -> List[ReflectedCheckConstraint]:
3517 parsed_state = self._parsed_state_or_create(
3518 connection, table_name, schema, **kw
3519 )
3520
3521 cks: List[ReflectedCheckConstraint] = [
3522 {"name": spec["name"], "sqltext": spec["sqltext"]}
3523 for spec in parsed_state.ck_constraints
3524 ]
3525 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
3526 return cks if cks else ReflectionDefaults.check_constraints()
3527
3528 @reflection.cache
3529 def get_table_comment(
3530 self,
3531 connection: Connection,
3532 table_name: str,
3533 schema: Optional[str] = None,
3534 **kw: Any,
3535 ) -> ReflectedTableComment:
3536 parsed_state = self._parsed_state_or_create(
3537 connection, table_name, schema, **kw
3538 )
3539 comment = parsed_state.table_options.get(f"{self.name}_comment", None)
3540 if comment is not None:
3541 return {"text": comment}
3542 else:
3543 return ReflectionDefaults.table_comment()
3544
3545 @reflection.cache
3546 def get_indexes(
3547 self,
3548 connection: Connection,
3549 table_name: str,
3550 schema: Optional[str] = None,
3551 **kw: Any,
3552 ) -> List[ReflectedIndex]:
3553 parsed_state = self._parsed_state_or_create(
3554 connection, table_name, schema, **kw
3555 )
3556
3557 indexes: List[ReflectedIndex] = []
3558
3559 for spec in parsed_state.keys:
3560 dialect_options = {}
3561 unique = False
3562 flavor = spec["type"]
3563 if flavor == "PRIMARY":
3564 continue
3565 if flavor == "UNIQUE":
3566 unique = True
3567 elif flavor in ("FULLTEXT", "SPATIAL"):
3568 dialect_options["%s_prefix" % self.name] = flavor
3569 elif flavor is not None:
3570 util.warn(
3571 "Converting unknown KEY type %s to a plain KEY", flavor
3572 )
3573
3574 if spec["parser"]:
3575 dialect_options["%s_with_parser" % (self.name)] = spec[
3576 "parser"
3577 ]
3578
3579 index_d: ReflectedIndex = {
3580 "name": spec["name"],
3581 "column_names": [s[0] for s in spec["columns"]],
3582 "unique": unique,
3583 }
3584
3585 mysql_length = {
3586 s[0]: s[1] for s in spec["columns"] if s[1] is not None
3587 }
3588 if mysql_length:
3589 dialect_options["%s_length" % self.name] = mysql_length
3590
3591 if flavor:
3592 index_d["type"] = flavor # type: ignore[typeddict-unknown-key]
3593
3594 if dialect_options:
3595 index_d["dialect_options"] = dialect_options
3596
3597 indexes.append(index_d)
3598 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
3599 return indexes if indexes else ReflectionDefaults.indexes()
3600
3601 @reflection.cache
3602 def get_unique_constraints(
3603 self,
3604 connection: Connection,
3605 table_name: str,
3606 schema: Optional[str] = None,
3607 **kw: Any,
3608 ) -> List[ReflectedUniqueConstraint]:
3609 parsed_state = self._parsed_state_or_create(
3610 connection, table_name, schema, **kw
3611 )
3612
3613 ucs: List[ReflectedUniqueConstraint] = [
3614 {
3615 "name": key["name"],
3616 "column_names": [col[0] for col in key["columns"]],
3617 "duplicates_index": key["name"],
3618 }
3619 for key in parsed_state.keys
3620 if key["type"] == "UNIQUE"
3621 ]
3622 ucs.sort(key=lambda d: d["name"] or "~") # sort None as last
3623 if ucs:
3624 return ucs
3625 else:
3626 return ReflectionDefaults.unique_constraints()
3627
3628 @reflection.cache
3629 def get_view_definition(
3630 self,
3631 connection: Connection,
3632 view_name: str,
3633 schema: Optional[str] = None,
3634 **kw: Any,
3635 ) -> str:
3636 charset = self._connection_charset
3637 full_name = ".".join(
3638 self.identifier_preparer._quote_free_identifiers(schema, view_name)
3639 )
3640 sql = self._show_create_table(
3641 connection, None, charset, full_name=full_name
3642 )
3643 if sql.upper().startswith("CREATE TABLE"):
3644 # it's a table, not a view
3645 raise exc.NoSuchTableError(full_name)
3646 return sql
3647
3648 def _parsed_state_or_create(
3649 self,
3650 connection: Connection,
3651 table_name: str,
3652 schema: Optional[str] = None,
3653 **kw: Any,
3654 ) -> _reflection.ReflectedState:
3655 return self._setup_parser(
3656 connection,
3657 table_name,
3658 schema,
3659 info_cache=kw.get("info_cache", None),
3660 )
3661
3662 @util.memoized_property
3663 def _tabledef_parser(self) -> _reflection.MySQLTableDefinitionParser:
3664 """return the MySQLTableDefinitionParser, generate if needed.
3665
3666 The deferred creation ensures that the dialect has
3667 retrieved server version information first.
3668
3669 """
3670 preparer = self.identifier_preparer
3671 return _reflection.MySQLTableDefinitionParser(self, preparer)
3672
3673 @reflection.cache
3674 def _setup_parser(
3675 self,
3676 connection: Connection,
3677 table_name: str,
3678 schema: Optional[str] = None,
3679 **kw: Any,
3680 ) -> _reflection.ReflectedState:
3681 charset = self._connection_charset
3682 parser = self._tabledef_parser
3683 full_name = ".".join(
3684 self.identifier_preparer._quote_free_identifiers(
3685 schema, table_name
3686 )
3687 )
3688 sql = self._show_create_table(
3689 connection, None, charset, full_name=full_name
3690 )
3691 if parser._check_view(sql):
3692 # Adapt views to something table-like.
3693 columns = self._describe_table(
3694 connection, None, charset, full_name=full_name
3695 )
3696 sql = parser._describe_to_create(
3697 table_name, columns # type: ignore[arg-type]
3698 )
3699 return parser.parse(sql, charset)
3700
3701 def _fetch_setting(
3702 self, connection: Connection, setting_name: str
3703 ) -> Optional[str]:
3704 charset = self._connection_charset
3705
3706 if self.server_version_info and self.server_version_info < (5, 6):
3707 sql = "SHOW VARIABLES LIKE '%s'" % setting_name
3708 fetch_col = 1
3709 else:
3710 sql = "SELECT @@%s" % setting_name
3711 fetch_col = 0
3712
3713 show_var = connection.exec_driver_sql(sql)
3714 row = self._compat_first(show_var, charset=charset)
3715 if not row:
3716 return None
3717 else:
3718 return cast(Optional[str], row[fetch_col])
3719
3720 def _detect_charset(self, connection: Connection) -> str:
3721 raise NotImplementedError()
3722
3723 def _detect_casing(self, connection: Connection) -> int:
3724 """Sniff out identifier case sensitivity.
3725
3726 Cached per-connection. This value can not change without a server
3727 restart.
3728
3729 """
3730 # https://dev.mysql.com/doc/refman/en/identifier-case-sensitivity.html
3731
3732 setting = self._fetch_setting(connection, "lower_case_table_names")
3733 if setting is None:
3734 cs = 0
3735 else:
3736 # 4.0.15 returns OFF or ON according to [ticket:489]
3737 # 3.23 doesn't, 4.0.27 doesn't..
3738 if setting == "OFF":
3739 cs = 0
3740 elif setting == "ON":
3741 cs = 1
3742 else:
3743 cs = int(setting)
3744 self._casing = cs
3745 return cs
3746
3747 def _detect_collations(self, connection: Connection) -> Dict[str, str]:
3748 """Pull the active COLLATIONS list from the server.
3749
3750 Cached per-connection.
3751 """
3752
3753 collations = {}
3754 charset = self._connection_charset
3755 rs = connection.exec_driver_sql("SHOW COLLATION")
3756 for row in self._compat_fetchall(rs, charset):
3757 collations[row[0]] = row[1]
3758 return collations
3759
3760 def _detect_sql_mode(self, connection: Connection) -> None:
3761 setting = self._fetch_setting(connection, "sql_mode")
3762
3763 if setting is None:
3764 util.warn(
3765 "Could not retrieve SQL_MODE; please ensure the "
3766 "MySQL user has permissions to SHOW VARIABLES"
3767 )
3768 self._sql_mode = ""
3769 else:
3770 self._sql_mode = setting or ""
3771
3772 def _detect_ansiquotes(self, connection: Connection) -> None:
3773 """Detect and adjust for the ANSI_QUOTES sql mode."""
3774
3775 mode = self._sql_mode
3776 if not mode:
3777 mode = ""
3778 elif mode.isdigit():
3779 mode_no = int(mode)
3780 mode = (mode_no | 4 == mode_no) and "ANSI_QUOTES" or ""
3781
3782 self._server_ansiquotes = "ANSI_QUOTES" in mode
3783
3784 # as of MySQL 5.0.1
3785 self._backslash_escapes = "NO_BACKSLASH_ESCAPES" not in mode
3786
3787 @overload
3788 def _show_create_table(
3789 self,
3790 connection: Connection,
3791 table: Optional[Table],
3792 charset: Optional[str],
3793 full_name: str,
3794 ) -> str: ...
3795
3796 @overload
3797 def _show_create_table(
3798 self,
3799 connection: Connection,
3800 table: Table,
3801 charset: Optional[str] = None,
3802 full_name: None = None,
3803 ) -> str: ...
3804
3805 def _show_create_table(
3806 self,
3807 connection: Connection,
3808 table: Optional[Table],
3809 charset: Optional[str] = None,
3810 full_name: Optional[str] = None,
3811 ) -> str:
3812 """Run SHOW CREATE TABLE for a ``Table``."""
3813
3814 if full_name is None:
3815 assert table is not None
3816 full_name = self.identifier_preparer.format_table(table)
3817 st = "SHOW CREATE TABLE %s" % full_name
3818
3819 try:
3820 rp = connection.execution_options(
3821 skip_user_error_events=True
3822 ).exec_driver_sql(st)
3823 except exc.DBAPIError as e:
3824 if self._extract_error_code(e.orig) == 1146: # type: ignore[arg-type] # noqa: E501
3825 raise exc.NoSuchTableError(full_name) from e
3826 else:
3827 raise
3828 row = self._compat_first(rp, charset=charset)
3829 if not row:
3830 raise exc.NoSuchTableError(full_name)
3831 return cast(str, row[1]).strip()
3832
3833 @overload
3834 def _describe_table(
3835 self,
3836 connection: Connection,
3837 table: Optional[Table],
3838 charset: Optional[str],
3839 full_name: str,
3840 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]: ...
3841
3842 @overload
3843 def _describe_table(
3844 self,
3845 connection: Connection,
3846 table: Table,
3847 charset: Optional[str] = None,
3848 full_name: None = None,
3849 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]: ...
3850
3851 def _describe_table(
3852 self,
3853 connection: Connection,
3854 table: Optional[Table],
3855 charset: Optional[str] = None,
3856 full_name: Optional[str] = None,
3857 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]:
3858 """Run DESCRIBE for a ``Table`` and return processed rows."""
3859
3860 if full_name is None:
3861 assert table is not None
3862 full_name = self.identifier_preparer.format_table(table)
3863 st = "DESCRIBE %s" % full_name
3864
3865 rp, rows = None, None
3866 try:
3867 try:
3868 rp = connection.execution_options(
3869 skip_user_error_events=True
3870 ).exec_driver_sql(st)
3871 except exc.DBAPIError as e:
3872 code = self._extract_error_code(e.orig) # type: ignore[arg-type] # noqa: E501
3873 if code == 1146:
3874 raise exc.NoSuchTableError(full_name) from e
3875
3876 elif code == 1356:
3877 raise exc.UnreflectableTableError(
3878 "Table or view named %s could not be "
3879 "reflected: %s" % (full_name, e)
3880 ) from e
3881
3882 else:
3883 raise
3884 rows = self._compat_fetchall(rp, charset=charset)
3885 finally:
3886 if rp:
3887 rp.close()
3888 return rows
3889
3890
3891class _DecodingRow:
3892 """Return unicode-decoded values based on type inspection.
3893
3894 Smooth over data type issues (esp. with alpha driver versions) and
3895 normalize strings as Unicode regardless of user-configured driver
3896 encoding settings.
3897
3898 """
3899
3900 # Some MySQL-python versions can return some columns as
3901 # sets.Set(['value']) (seriously) but thankfully that doesn't
3902 # seem to come up in DDL queries.
3903
3904 _encoding_compat: Dict[str, str] = {
3905 "koi8r": "koi8_r",
3906 "koi8u": "koi8_u",
3907 "utf16": "utf-16-be", # MySQL's uft16 is always bigendian
3908 "utf8mb4": "utf8", # real utf8
3909 "utf8mb3": "utf8", # real utf8; saw this happen on CI but I cannot
3910 # reproduce, possibly mariadb10.6 related
3911 "eucjpms": "ujis",
3912 }
3913
3914 def __init__(self, rowproxy: Row[Any], charset: Optional[str]):
3915 self.rowproxy = rowproxy
3916 self.charset = (
3917 self._encoding_compat.get(charset, charset)
3918 if charset is not None
3919 else None
3920 )
3921
3922 def __getitem__(self, index: int) -> Any:
3923 item = self.rowproxy[index]
3924 if self.charset and isinstance(item, bytes):
3925 return item.decode(self.charset)
3926 else:
3927 return item
3928
3929 def __getattr__(self, attr: str) -> Any:
3930 item = getattr(self.rowproxy, attr)
3931 if self.charset and isinstance(item, bytes):
3932 return item.decode(self.charset)
3933 else:
3934 return item
3935
3936
3937_info_columns = sql.table(
3938 "columns",
3939 sql.column("table_schema", VARCHAR(64)),
3940 sql.column("table_name", VARCHAR(64)),
3941 sql.column("column_name", VARCHAR(64)),
3942 schema="information_schema",
3943)