1# dialects/mysql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: mysql
12 :name: MySQL / MariaDB
13 :normal_support: 5.6+ / 10+
14 :best_effort: 5.0.2+ / 5.0.2+
15
16Supported Versions and Features
17-------------------------------
18
19SQLAlchemy supports MySQL starting with version 5.0.2 through modern releases,
20as well as all modern versions of MariaDB. See the official MySQL
21documentation for detailed information about features supported in any given
22server release.
23
24.. versionchanged:: 1.4 minimum MySQL version supported is now 5.0.2.
25
26MariaDB Support
27~~~~~~~~~~~~~~~
28
29The MariaDB variant of MySQL retains fundamental compatibility with MySQL's
30protocols however the development of these two products continues to diverge.
31Within the realm of SQLAlchemy, the two databases have a small number of
32syntactical and behavioral differences that SQLAlchemy accommodates automatically.
33To connect to a MariaDB database, no changes to the database URL are required::
34
35
36 engine = create_engine(
37 "mysql+pymysql://user:pass@some_mariadb/dbname?charset=utf8mb4"
38 )
39
40Upon first connect, the SQLAlchemy dialect employs a
41server version detection scheme that determines if the
42backing database reports as MariaDB. Based on this flag, the dialect
43can make different choices in those of areas where its behavior
44must be different.
45
46.. _mysql_mariadb_only_mode:
47
48MariaDB-Only Mode
49~~~~~~~~~~~~~~~~~
50
51The dialect also supports an **optional** "MariaDB-only" mode of connection, which may be
52useful for the case where an application makes use of MariaDB-specific features
53and is not compatible with a MySQL database. To use this mode of operation,
54replace the "mysql" token in the above URL with "mariadb"::
55
56 engine = create_engine(
57 "mariadb+pymysql://user:pass@some_mariadb/dbname?charset=utf8mb4"
58 )
59
60The above engine, upon first connect, will raise an error if the server version
61detection detects that the backing database is not MariaDB.
62
63When using an engine with ``"mariadb"`` as the dialect name, **all mysql-specific options
64that include the name "mysql" in them are now named with "mariadb"**. This means
65options like ``mysql_engine`` should be named ``mariadb_engine``, etc. Both
66"mysql" and "mariadb" options can be used simultaneously for applications that
67use URLs with both "mysql" and "mariadb" dialects::
68
69 my_table = Table(
70 "mytable",
71 metadata,
72 Column("id", Integer, primary_key=True),
73 Column("textdata", String(50)),
74 mariadb_engine="InnoDB",
75 mysql_engine="InnoDB",
76 )
77
78 Index(
79 "textdata_ix",
80 my_table.c.textdata,
81 mysql_prefix="FULLTEXT",
82 mariadb_prefix="FULLTEXT",
83 )
84
85Similar behavior will occur when the above structures are reflected, i.e. the
86"mariadb" prefix will be present in the option names when the database URL
87is based on the "mariadb" name.
88
89.. versionadded:: 1.4 Added "mariadb" dialect name supporting "MariaDB-only mode"
90 for the MySQL dialect.
91
92.. _mysql_connection_timeouts:
93
94Connection Timeouts and Disconnects
95-----------------------------------
96
97MySQL / MariaDB feature an automatic connection close behavior, for connections that
98have been idle for a fixed period of time, defaulting to eight hours.
99To circumvent having this issue, use
100the :paramref:`_sa.create_engine.pool_recycle` option which ensures that
101a connection will be discarded and replaced with a new one if it has been
102present in the pool for a fixed number of seconds::
103
104 engine = create_engine("mysql+mysqldb://...", pool_recycle=3600)
105
106For more comprehensive disconnect detection of pooled connections, including
107accommodation of server restarts and network issues, a pre-ping approach may
108be employed. See :ref:`pool_disconnects` for current approaches.
109
110.. seealso::
111
112 :ref:`pool_disconnects` - Background on several techniques for dealing
113 with timed out connections as well as database restarts.
114
115.. _mysql_storage_engines:
116
117CREATE TABLE arguments including Storage Engines
118------------------------------------------------
119
120Both MySQL's and MariaDB's CREATE TABLE syntax includes a wide array of special options,
121including ``ENGINE``, ``CHARSET``, ``MAX_ROWS``, ``ROW_FORMAT``,
122``INSERT_METHOD``, and many more.
123To accommodate the rendering of these arguments, specify the form
124``mysql_argument_name="value"``. For example, to specify a table with
125``ENGINE`` of ``InnoDB``, ``CHARSET`` of ``utf8mb4``, and ``KEY_BLOCK_SIZE``
126of ``1024``::
127
128 Table(
129 "mytable",
130 metadata,
131 Column("data", String(32)),
132 mysql_engine="InnoDB",
133 mysql_charset="utf8mb4",
134 mysql_key_block_size="1024",
135 )
136
137When supporting :ref:`mysql_mariadb_only_mode` mode, similar keys against
138the "mariadb" prefix must be included as well. The values can of course
139vary independently so that different settings on MySQL vs. MariaDB may
140be maintained::
141
142 # support both "mysql" and "mariadb-only" engine URLs
143
144 Table(
145 "mytable",
146 metadata,
147 Column("data", String(32)),
148 mysql_engine="InnoDB",
149 mariadb_engine="InnoDB",
150 mysql_charset="utf8mb4",
151 mariadb_charset="utf8",
152 mysql_key_block_size="1024",
153 mariadb_key_block_size="1024",
154 )
155
156The MySQL / MariaDB dialects will normally transfer any keyword specified as
157``mysql_keyword_name`` to be rendered as ``KEYWORD_NAME`` in the
158``CREATE TABLE`` statement. A handful of these names will render with a space
159instead of an underscore; to support this, the MySQL dialect has awareness of
160these particular names, which include ``DATA DIRECTORY``
161(e.g. ``mysql_data_directory``), ``CHARACTER SET`` (e.g.
162``mysql_character_set``) and ``INDEX DIRECTORY`` (e.g.
163``mysql_index_directory``).
164
165The most common argument is ``mysql_engine``, which refers to the storage
166engine for the table. Historically, MySQL server installations would default
167to ``MyISAM`` for this value, although newer versions may be defaulting
168to ``InnoDB``. The ``InnoDB`` engine is typically preferred for its support
169of transactions and foreign keys.
170
171A :class:`_schema.Table`
172that is created in a MySQL / MariaDB database with a storage engine
173of ``MyISAM`` will be essentially non-transactional, meaning any
174INSERT/UPDATE/DELETE statement referring to this table will be invoked as
175autocommit. It also will have no support for foreign key constraints; while
176the ``CREATE TABLE`` statement accepts foreign key options, when using the
177``MyISAM`` storage engine these arguments are discarded. Reflecting such a
178table will also produce no foreign key constraint information.
179
180For fully atomic transactions as well as support for foreign key
181constraints, all participating ``CREATE TABLE`` statements must specify a
182transactional engine, which in the vast majority of cases is ``InnoDB``.
183
184Partitioning can similarly be specified using similar options.
185In the example below the create table will specify ``PARTITION_BY``,
186``PARTITIONS``, ``SUBPARTITIONS`` and ``SUBPARTITION_BY``::
187
188 # can also use mariadb_* prefix
189 Table(
190 "testtable",
191 MetaData(),
192 Column("id", Integer(), primary_key=True, autoincrement=True),
193 Column("other_id", Integer(), primary_key=True, autoincrement=False),
194 mysql_partitions="2",
195 mysql_partition_by="KEY(other_id)",
196 mysql_subpartition_by="HASH(some_expr)",
197 mysql_subpartitions="2",
198 )
199
200This will render:
201
202.. sourcecode:: sql
203
204 CREATE TABLE testtable (
205 id INTEGER NOT NULL AUTO_INCREMENT,
206 other_id INTEGER NOT NULL,
207 PRIMARY KEY (id, other_id)
208 )PARTITION BY KEY(other_id) PARTITIONS 2 SUBPARTITION BY HASH(some_expr) SUBPARTITIONS 2
209
210Case Sensitivity and Table Reflection
211-------------------------------------
212
213Both MySQL and MariaDB have inconsistent support for case-sensitive identifier
214names, basing support on specific details of the underlying
215operating system. However, it has been observed that no matter
216what case sensitivity behavior is present, the names of tables in
217foreign key declarations are *always* received from the database
218as all-lower case, making it impossible to accurately reflect a
219schema where inter-related tables use mixed-case identifier names.
220
221Therefore it is strongly advised that table names be declared as
222all lower case both within SQLAlchemy as well as on the MySQL / MariaDB
223database itself, especially if database reflection features are
224to be used.
225
226.. _mysql_isolation_level:
227
228Transaction Isolation Level
229---------------------------
230
231All MySQL / MariaDB dialects support setting of transaction isolation level both via a
232dialect-specific parameter :paramref:`_sa.create_engine.isolation_level`
233accepted
234by :func:`_sa.create_engine`, as well as the
235:paramref:`.Connection.execution_options.isolation_level` argument as passed to
236:meth:`_engine.Connection.execution_options`.
237This feature works by issuing the
238command ``SET SESSION TRANSACTION ISOLATION LEVEL <level>`` for each new
239connection. For the special AUTOCOMMIT isolation level, DBAPI-specific
240techniques are used.
241
242To set isolation level using :func:`_sa.create_engine`::
243
244 engine = create_engine(
245 "mysql+mysqldb://scott:tiger@localhost/test",
246 isolation_level="READ UNCOMMITTED",
247 )
248
249To set using per-connection execution options::
250
251 connection = engine.connect()
252 connection = connection.execution_options(isolation_level="READ COMMITTED")
253
254Valid values for ``isolation_level`` include:
255
256* ``READ COMMITTED``
257* ``READ UNCOMMITTED``
258* ``REPEATABLE READ``
259* ``SERIALIZABLE``
260* ``AUTOCOMMIT``
261
262The special ``AUTOCOMMIT`` value makes use of the various "autocommit"
263attributes provided by specific DBAPIs, and is currently supported by
264MySQLdb, MySQL-Client, MySQL-Connector Python, and PyMySQL. Using it,
265the database connection will return true for the value of
266``SELECT @@autocommit;``.
267
268There are also more options for isolation level configurations, such as
269"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
270different isolation level settings. See the discussion at
271:ref:`dbapi_autocommit` for background.
272
273.. seealso::
274
275 :ref:`dbapi_autocommit`
276
277AUTO_INCREMENT Behavior
278-----------------------
279
280When creating tables, SQLAlchemy will automatically set ``AUTO_INCREMENT`` on
281the first :class:`.Integer` primary key column which is not marked as a
282foreign key::
283
284 >>> t = Table(
285 ... "mytable", metadata, Column("mytable_id", Integer, primary_key=True)
286 ... )
287 >>> t.create()
288 CREATE TABLE mytable (
289 id INTEGER NOT NULL AUTO_INCREMENT,
290 PRIMARY KEY (id)
291 )
292
293You can disable this behavior by passing ``False`` to the
294:paramref:`_schema.Column.autoincrement` argument of :class:`_schema.Column`.
295This flag
296can also be used to enable auto-increment on a secondary column in a
297multi-column key for some storage engines::
298
299 Table(
300 "mytable",
301 metadata,
302 Column("gid", Integer, primary_key=True, autoincrement=False),
303 Column("id", Integer, primary_key=True),
304 )
305
306.. _mysql_ss_cursors:
307
308Server Side Cursors
309-------------------
310
311Server-side cursor support is available for the mysqlclient, PyMySQL,
312mariadbconnector dialects and may also be available in others. This makes use
313of either the "buffered=True/False" flag if available or by using a class such
314as ``MySQLdb.cursors.SSCursor`` or ``pymysql.cursors.SSCursor`` internally.
315
316
317Server side cursors are enabled on a per-statement basis by using the
318:paramref:`.Connection.execution_options.stream_results` connection execution
319option::
320
321 with engine.connect() as conn:
322 result = conn.execution_options(stream_results=True).execute(
323 text("select * from table")
324 )
325
326Note that some kinds of SQL statements may not be supported with
327server side cursors; generally, only SQL statements that return rows should be
328used with this option.
329
330.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
331 and will be removed in a future release. Please use the
332 :paramref:`_engine.Connection.stream_results` execution option for
333 unbuffered cursor support.
334
335.. seealso::
336
337 :ref:`engine_stream_results`
338
339.. _mysql_unicode:
340
341Unicode
342-------
343
344Charset Selection
345~~~~~~~~~~~~~~~~~
346
347Most MySQL / MariaDB DBAPIs offer the option to set the client character set for
348a connection. This is typically delivered using the ``charset`` parameter
349in the URL, such as::
350
351 e = create_engine(
352 "mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4"
353 )
354
355This charset is the **client character set** for the connection. Some
356MySQL DBAPIs will default this to a value such as ``latin1``, and some
357will make use of the ``default-character-set`` setting in the ``my.cnf``
358file as well. Documentation for the DBAPI in use should be consulted
359for specific behavior.
360
361The encoding used for Unicode has traditionally been ``'utf8'``. However, for
362MySQL versions 5.5.3 and MariaDB 5.5 on forward, a new MySQL-specific encoding
363``'utf8mb4'`` has been introduced, and as of MySQL 8.0 a warning is emitted by
364the server if plain ``utf8`` is specified within any server-side directives,
365replaced with ``utf8mb3``. The rationale for this new encoding is due to the
366fact that MySQL's legacy utf-8 encoding only supports codepoints up to three
367bytes instead of four. Therefore, when communicating with a MySQL or MariaDB
368database that includes codepoints more than three bytes in size, this new
369charset is preferred, if supported by both the database as well as the client
370DBAPI, as in::
371
372 e = create_engine(
373 "mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4"
374 )
375
376All modern DBAPIs should support the ``utf8mb4`` charset.
377
378In order to use ``utf8mb4`` encoding for a schema that was created with legacy
379``utf8``, changes to the MySQL/MariaDB schema and/or server configuration may be
380required.
381
382.. seealso::
383
384 `The utf8mb4 Character Set \
385 <https://dev.mysql.com/doc/refman/5.5/en/charset-unicode-utf8mb4.html>`_ - \
386 in the MySQL documentation
387
388.. _mysql_binary_introducer:
389
390Dealing with Binary Data Warnings and Unicode
391~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
392
393MySQL versions 5.6, 5.7 and later (not MariaDB at the time of this writing) now
394emit a warning when attempting to pass binary data to the database, while a
395character set encoding is also in place, when the binary data itself is not
396valid for that encoding:
397
398.. sourcecode:: text
399
400 default.py:509: Warning: (1300, "Invalid utf8mb4 character string:
401 'F9876A'")
402 cursor.execute(statement, parameters)
403
404This warning is due to the fact that the MySQL client library is attempting to
405interpret the binary string as a unicode object even if a datatype such
406as :class:`.LargeBinary` is in use. To resolve this, the SQL statement requires
407a binary "character set introducer" be present before any non-NULL value
408that renders like this:
409
410.. sourcecode:: sql
411
412 INSERT INTO table (data) VALUES (_binary %s)
413
414These character set introducers are provided by the DBAPI driver, assuming the
415use of mysqlclient or PyMySQL (both of which are recommended). Add the query
416string parameter ``binary_prefix=true`` to the URL to repair this warning::
417
418 # mysqlclient
419 engine = create_engine(
420 "mysql+mysqldb://scott:tiger@localhost/test?charset=utf8mb4&binary_prefix=true"
421 )
422
423 # PyMySQL
424 engine = create_engine(
425 "mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4&binary_prefix=true"
426 )
427
428The ``binary_prefix`` flag may or may not be supported by other MySQL drivers.
429
430SQLAlchemy itself cannot render this ``_binary`` prefix reliably, as it does
431not work with the NULL value, which is valid to be sent as a bound parameter.
432As the MySQL driver renders parameters directly into the SQL string, it's the
433most efficient place for this additional keyword to be passed.
434
435.. seealso::
436
437 `Character set introducers <https://dev.mysql.com/doc/refman/5.7/en/charset-introducer.html>`_ - on the MySQL website
438
439
440ANSI Quoting Style
441------------------
442
443MySQL / MariaDB feature two varieties of identifier "quoting style", one using
444backticks and the other using quotes, e.g. ```some_identifier``` vs.
445``"some_identifier"``. All MySQL dialects detect which version
446is in use by checking the value of :ref:`sql_mode<mysql_sql_mode>` when a connection is first
447established with a particular :class:`_engine.Engine`.
448This quoting style comes
449into play when rendering table and column names as well as when reflecting
450existing database structures. The detection is entirely automatic and
451no special configuration is needed to use either quoting style.
452
453
454.. _mysql_sql_mode:
455
456Changing the sql_mode
457---------------------
458
459MySQL supports operating in multiple
460`Server SQL Modes <https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html>`_ for
461both Servers and Clients. To change the ``sql_mode`` for a given application, a
462developer can leverage SQLAlchemy's Events system.
463
464In the following example, the event system is used to set the ``sql_mode`` on
465the ``first_connect`` and ``connect`` events::
466
467 from sqlalchemy import create_engine, event
468
469 eng = create_engine(
470 "mysql+mysqldb://scott:tiger@localhost/test", echo="debug"
471 )
472
473
474 # `insert=True` will ensure this is the very first listener to run
475 @event.listens_for(eng, "connect", insert=True)
476 def connect(dbapi_connection, connection_record):
477 cursor = dbapi_connection.cursor()
478 cursor.execute("SET sql_mode = 'STRICT_ALL_TABLES'")
479
480
481 conn = eng.connect()
482
483In the example illustrated above, the "connect" event will invoke the "SET"
484statement on the connection at the moment a particular DBAPI connection is
485first created for a given Pool, before the connection is made available to the
486connection pool. Additionally, because the function was registered with
487``insert=True``, it will be prepended to the internal list of registered
488functions.
489
490
491MySQL / MariaDB SQL Extensions
492------------------------------
493
494Many of the MySQL / MariaDB SQL extensions are handled through SQLAlchemy's generic
495function and operator support::
496
497 table.select(table.c.password == func.md5("plaintext"))
498 table.select(table.c.username.op("regexp")("^[a-d]"))
499
500And of course any valid SQL statement can be executed as a string as well.
501
502Some limited direct support for MySQL / MariaDB extensions to SQL is currently
503available.
504
505* INSERT..ON DUPLICATE KEY UPDATE: See
506 :ref:`mysql_insert_on_duplicate_key_update`
507
508* SELECT pragma, use :meth:`_expression.Select.prefix_with` and
509 :meth:`_query.Query.prefix_with`::
510
511 select(...).prefix_with(["HIGH_PRIORITY", "SQL_SMALL_RESULT"])
512
513* UPDATE with LIMIT::
514
515 update(...).with_dialect_options(mysql_limit=10, mariadb_limit=10)
516
517* DELETE
518 with LIMIT::
519
520 delete(...).with_dialect_options(mysql_limit=10, mariadb_limit=10)
521
522 .. versionadded:: 2.0.37 Added delete with limit
523
524* optimizer hints, use :meth:`_expression.Select.prefix_with` and
525 :meth:`_query.Query.prefix_with`::
526
527 select(...).prefix_with("/*+ NO_RANGE_OPTIMIZATION(t4 PRIMARY) */")
528
529* index hints, use :meth:`_expression.Select.with_hint` and
530 :meth:`_query.Query.with_hint`::
531
532 select(...).with_hint(some_table, "USE INDEX xyz")
533
534* MATCH
535 operator support::
536
537 from sqlalchemy.dialects.mysql import match
538
539 select(...).where(match(col1, col2, against="some expr").in_boolean_mode())
540
541 .. seealso::
542
543 :class:`_mysql.match`
544
545INSERT/DELETE...RETURNING
546-------------------------
547
548The MariaDB dialect supports 10.5+'s ``INSERT..RETURNING`` and
549``DELETE..RETURNING`` (10.0+) syntaxes. ``INSERT..RETURNING`` may be used
550automatically in some cases in order to fetch newly generated identifiers in
551place of the traditional approach of using ``cursor.lastrowid``, however
552``cursor.lastrowid`` is currently still preferred for simple single-statement
553cases for its better performance.
554
555To specify an explicit ``RETURNING`` clause, use the
556:meth:`._UpdateBase.returning` method on a per-statement basis::
557
558 # INSERT..RETURNING
559 result = connection.execute(
560 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
561 )
562 print(result.all())
563
564 # DELETE..RETURNING
565 result = connection.execute(
566 table.delete()
567 .where(table.c.name == "foo")
568 .returning(table.c.col1, table.c.col2)
569 )
570 print(result.all())
571
572.. versionadded:: 2.0 Added support for MariaDB RETURNING
573
574.. _mysql_insert_on_duplicate_key_update:
575
576INSERT...ON DUPLICATE KEY UPDATE (Upsert)
577------------------------------------------
578
579MySQL / MariaDB allow "upserts" (update or insert)
580of rows into a table via the ``ON DUPLICATE KEY UPDATE`` clause of the
581``INSERT`` statement. A candidate row will only be inserted if that row does
582not match an existing primary or unique key in the table; otherwise, an UPDATE
583will be performed. The statement allows for separate specification of the
584values to INSERT versus the values for UPDATE.
585
586SQLAlchemy provides ``ON DUPLICATE KEY UPDATE`` support via the MySQL-specific
587:func:`.mysql.insert()` function, which provides
588the generative method :meth:`~.mysql.Insert.on_duplicate_key_update`:
589
590.. sourcecode:: pycon+sql
591
592 >>> from sqlalchemy.dialects.mysql import insert
593
594 >>> insert_stmt = insert(my_table).values(
595 ... id="some_existing_id", data="inserted value"
596 ... )
597
598 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
599 ... data=insert_stmt.inserted.data, status="U"
600 ... )
601 >>> print(on_duplicate_key_stmt)
602 {printsql}INSERT INTO my_table (id, data) VALUES (%s, %s)
603 ON DUPLICATE KEY UPDATE data = VALUES(data), status = %s
604
605
606Unlike PostgreSQL's "ON CONFLICT" phrase, the "ON DUPLICATE KEY UPDATE"
607phrase will always match on any primary key or unique key, and will always
608perform an UPDATE if there's a match; there are no options for it to raise
609an error or to skip performing an UPDATE.
610
611``ON DUPLICATE KEY UPDATE`` is used to perform an update of the already
612existing row, using any combination of new values as well as values
613from the proposed insertion. These values are normally specified using
614keyword arguments passed to the
615:meth:`_mysql.Insert.on_duplicate_key_update`
616given column key values (usually the name of the column, unless it
617specifies :paramref:`_schema.Column.key`
618) as keys and literal or SQL expressions
619as values:
620
621.. sourcecode:: pycon+sql
622
623 >>> insert_stmt = insert(my_table).values(
624 ... id="some_existing_id", data="inserted value"
625 ... )
626
627 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
628 ... data="some data",
629 ... updated_at=func.current_timestamp(),
630 ... )
631
632 >>> print(on_duplicate_key_stmt)
633 {printsql}INSERT INTO my_table (id, data) VALUES (%s, %s)
634 ON DUPLICATE KEY UPDATE data = %s, updated_at = CURRENT_TIMESTAMP
635
636In a manner similar to that of :meth:`.UpdateBase.values`, other parameter
637forms are accepted, including a single dictionary:
638
639.. sourcecode:: pycon+sql
640
641 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
642 ... {"data": "some data", "updated_at": func.current_timestamp()},
643 ... )
644
645as well as a list of 2-tuples, which will automatically provide
646a parameter-ordered UPDATE statement in a manner similar to that described
647at :ref:`tutorial_parameter_ordered_updates`. Unlike the :class:`_expression.Update`
648object,
649no special flag is needed to specify the intent since the argument form is
650this context is unambiguous:
651
652.. sourcecode:: pycon+sql
653
654 >>> on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
655 ... [
656 ... ("data", "some data"),
657 ... ("updated_at", func.current_timestamp()),
658 ... ]
659 ... )
660
661 >>> print(on_duplicate_key_stmt)
662 {printsql}INSERT INTO my_table (id, data) VALUES (%s, %s)
663 ON DUPLICATE KEY UPDATE data = %s, updated_at = CURRENT_TIMESTAMP
664
665.. versionchanged:: 1.3 support for parameter-ordered UPDATE clause within
666 MySQL ON DUPLICATE KEY UPDATE
667
668.. warning::
669
670 The :meth:`_mysql.Insert.on_duplicate_key_update`
671 method does **not** take into
672 account Python-side default UPDATE values or generation functions, e.g.
673 e.g. those specified using :paramref:`_schema.Column.onupdate`.
674 These values will not be exercised for an ON DUPLICATE KEY style of UPDATE,
675 unless they are manually specified explicitly in the parameters.
676
677
678
679In order to refer to the proposed insertion row, the special alias
680:attr:`_mysql.Insert.inserted` is available as an attribute on
681the :class:`_mysql.Insert` object; this object is a
682:class:`_expression.ColumnCollection` which contains all columns of the target
683table:
684
685.. sourcecode:: pycon+sql
686
687 >>> stmt = insert(my_table).values(
688 ... id="some_id", data="inserted value", author="jlh"
689 ... )
690
691 >>> do_update_stmt = stmt.on_duplicate_key_update(
692 ... data="updated value", author=stmt.inserted.author
693 ... )
694
695 >>> print(do_update_stmt)
696 {printsql}INSERT INTO my_table (id, data, author) VALUES (%s, %s, %s)
697 ON DUPLICATE KEY UPDATE data = %s, author = VALUES(author)
698
699When rendered, the "inserted" namespace will produce the expression
700``VALUES(<columnname>)``.
701
702.. versionadded:: 1.2 Added support for MySQL ON DUPLICATE KEY UPDATE clause
703
704
705
706rowcount Support
707----------------
708
709SQLAlchemy standardizes the DBAPI ``cursor.rowcount`` attribute to be the
710usual definition of "number of rows matched by an UPDATE or DELETE" statement.
711This is in contradiction to the default setting on most MySQL DBAPI drivers,
712which is "number of rows actually modified/deleted". For this reason, the
713SQLAlchemy MySQL dialects always add the ``constants.CLIENT.FOUND_ROWS``
714flag, or whatever is equivalent for the target dialect, upon connection.
715This setting is currently hardcoded.
716
717.. seealso::
718
719 :attr:`_engine.CursorResult.rowcount`
720
721
722.. _mysql_indexes:
723
724MySQL / MariaDB- Specific Index Options
725-----------------------------------------
726
727MySQL and MariaDB-specific extensions to the :class:`.Index` construct are available.
728
729Index Length
730~~~~~~~~~~~~~
731
732MySQL and MariaDB both provide an option to create index entries with a certain length, where
733"length" refers to the number of characters or bytes in each value which will
734become part of the index. SQLAlchemy provides this feature via the
735``mysql_length`` and/or ``mariadb_length`` parameters::
736
737 Index("my_index", my_table.c.data, mysql_length=10, mariadb_length=10)
738
739 Index("a_b_idx", my_table.c.a, my_table.c.b, mysql_length={"a": 4, "b": 9})
740
741 Index(
742 "a_b_idx", my_table.c.a, my_table.c.b, mariadb_length={"a": 4, "b": 9}
743 )
744
745Prefix lengths are given in characters for nonbinary string types and in bytes
746for binary string types. The value passed to the keyword argument *must* be
747either an integer (and, thus, specify the same prefix length value for all
748columns of the index) or a dict in which keys are column names and values are
749prefix length values for corresponding columns. MySQL and MariaDB only allow a
750length for a column of an index if it is for a CHAR, VARCHAR, TEXT, BINARY,
751VARBINARY and BLOB.
752
753Index Prefixes
754~~~~~~~~~~~~~~
755
756MySQL storage engines permit you to specify an index prefix when creating
757an index. SQLAlchemy provides this feature via the
758``mysql_prefix`` parameter on :class:`.Index`::
759
760 Index("my_index", my_table.c.data, mysql_prefix="FULLTEXT")
761
762The value passed to the keyword argument will be simply passed through to the
763underlying CREATE INDEX, so it *must* be a valid index prefix for your MySQL
764storage engine.
765
766.. seealso::
767
768 `CREATE INDEX <https://dev.mysql.com/doc/refman/5.0/en/create-index.html>`_ - MySQL documentation
769
770Index Types
771~~~~~~~~~~~~~
772
773Some MySQL storage engines permit you to specify an index type when creating
774an index or primary key constraint. SQLAlchemy provides this feature via the
775``mysql_using`` parameter on :class:`.Index`::
776
777 Index(
778 "my_index", my_table.c.data, mysql_using="hash", mariadb_using="hash"
779 )
780
781As well as the ``mysql_using`` parameter on :class:`.PrimaryKeyConstraint`::
782
783 PrimaryKeyConstraint("data", mysql_using="hash", mariadb_using="hash")
784
785The value passed to the keyword argument will be simply passed through to the
786underlying CREATE INDEX or PRIMARY KEY clause, so it *must* be a valid index
787type for your MySQL storage engine.
788
789More information can be found at:
790
791https://dev.mysql.com/doc/refman/5.0/en/create-index.html
792
793https://dev.mysql.com/doc/refman/5.0/en/create-table.html
794
795Index Parsers
796~~~~~~~~~~~~~
797
798CREATE FULLTEXT INDEX in MySQL also supports a "WITH PARSER" option. This
799is available using the keyword argument ``mysql_with_parser``::
800
801 Index(
802 "my_index",
803 my_table.c.data,
804 mysql_prefix="FULLTEXT",
805 mysql_with_parser="ngram",
806 mariadb_prefix="FULLTEXT",
807 mariadb_with_parser="ngram",
808 )
809
810.. versionadded:: 1.3
811
812
813.. _mysql_foreign_keys:
814
815MySQL / MariaDB Foreign Keys
816-----------------------------
817
818MySQL and MariaDB's behavior regarding foreign keys has some important caveats.
819
820Foreign Key Arguments to Avoid
821~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
822
823Neither MySQL nor MariaDB support the foreign key arguments "DEFERRABLE", "INITIALLY",
824or "MATCH". Using the ``deferrable`` or ``initially`` keyword argument with
825:class:`_schema.ForeignKeyConstraint` or :class:`_schema.ForeignKey`
826will have the effect of
827these keywords being rendered in a DDL expression, which will then raise an
828error on MySQL or MariaDB. In order to use these keywords on a foreign key while having
829them ignored on a MySQL / MariaDB backend, use a custom compile rule::
830
831 from sqlalchemy.ext.compiler import compiles
832 from sqlalchemy.schema import ForeignKeyConstraint
833
834
835 @compiles(ForeignKeyConstraint, "mysql", "mariadb")
836 def process(element, compiler, **kw):
837 element.deferrable = element.initially = None
838 return compiler.visit_foreign_key_constraint(element, **kw)
839
840The "MATCH" keyword is in fact more insidious, and is explicitly disallowed
841by SQLAlchemy in conjunction with the MySQL or MariaDB backends. This argument is
842silently ignored by MySQL / MariaDB, but in addition has the effect of ON UPDATE and ON
843DELETE options also being ignored by the backend. Therefore MATCH should
844never be used with the MySQL / MariaDB backends; as is the case with DEFERRABLE and
845INITIALLY, custom compilation rules can be used to correct a
846ForeignKeyConstraint at DDL definition time.
847
848Reflection of Foreign Key Constraints
849~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
850
851Not all MySQL / MariaDB storage engines support foreign keys. When using the
852very common ``MyISAM`` MySQL storage engine, the information loaded by table
853reflection will not include foreign keys. For these tables, you may supply a
854:class:`~sqlalchemy.ForeignKeyConstraint` at reflection time::
855
856 Table(
857 "mytable",
858 metadata,
859 ForeignKeyConstraint(["other_id"], ["othertable.other_id"]),
860 autoload_with=engine,
861 )
862
863.. seealso::
864
865 :ref:`mysql_storage_engines`
866
867.. _mysql_unique_constraints:
868
869MySQL / MariaDB Unique Constraints and Reflection
870----------------------------------------------------
871
872SQLAlchemy supports both the :class:`.Index` construct with the
873flag ``unique=True``, indicating a UNIQUE index, as well as the
874:class:`.UniqueConstraint` construct, representing a UNIQUE constraint.
875Both objects/syntaxes are supported by MySQL / MariaDB when emitting DDL to create
876these constraints. However, MySQL / MariaDB does not have a unique constraint
877construct that is separate from a unique index; that is, the "UNIQUE"
878constraint on MySQL / MariaDB is equivalent to creating a "UNIQUE INDEX".
879
880When reflecting these constructs, the
881:meth:`_reflection.Inspector.get_indexes`
882and the :meth:`_reflection.Inspector.get_unique_constraints`
883methods will **both**
884return an entry for a UNIQUE index in MySQL / MariaDB. However, when performing
885full table reflection using ``Table(..., autoload_with=engine)``,
886the :class:`.UniqueConstraint` construct is
887**not** part of the fully reflected :class:`_schema.Table` construct under any
888circumstances; this construct is always represented by a :class:`.Index`
889with the ``unique=True`` setting present in the :attr:`_schema.Table.indexes`
890collection.
891
892
893TIMESTAMP / DATETIME issues
894---------------------------
895
896.. _mysql_timestamp_onupdate:
897
898Rendering ON UPDATE CURRENT TIMESTAMP for MySQL / MariaDB's explicit_defaults_for_timestamp
899~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
900
901MySQL / MariaDB have historically expanded the DDL for the :class:`_types.TIMESTAMP`
902datatype into the phrase "TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE
903CURRENT_TIMESTAMP", which includes non-standard SQL that automatically updates
904the column with the current timestamp when an UPDATE occurs, eliminating the
905usual need to use a trigger in such a case where server-side update changes are
906desired.
907
908MySQL 5.6 introduced a new flag `explicit_defaults_for_timestamp
909<https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html
910#sysvar_explicit_defaults_for_timestamp>`_ which disables the above behavior,
911and in MySQL 8 this flag defaults to true, meaning in order to get a MySQL
912"on update timestamp" without changing this flag, the above DDL must be
913rendered explicitly. Additionally, the same DDL is valid for use of the
914``DATETIME`` datatype as well.
915
916SQLAlchemy's MySQL dialect does not yet have an option to generate
917MySQL's "ON UPDATE CURRENT_TIMESTAMP" clause, noting that this is not a general
918purpose "ON UPDATE" as there is no such syntax in standard SQL. SQLAlchemy's
919:paramref:`_schema.Column.server_onupdate` parameter is currently not related
920to this special MySQL behavior.
921
922To generate this DDL, make use of the :paramref:`_schema.Column.server_default`
923parameter and pass a textual clause that also includes the ON UPDATE clause::
924
925 from sqlalchemy import Table, MetaData, Column, Integer, String, TIMESTAMP
926 from sqlalchemy import text
927
928 metadata = MetaData()
929
930 mytable = Table(
931 "mytable",
932 metadata,
933 Column("id", Integer, primary_key=True),
934 Column("data", String(50)),
935 Column(
936 "last_updated",
937 TIMESTAMP,
938 server_default=text(
939 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
940 ),
941 ),
942 )
943
944The same instructions apply to use of the :class:`_types.DateTime` and
945:class:`_types.DATETIME` datatypes::
946
947 from sqlalchemy import DateTime
948
949 mytable = Table(
950 "mytable",
951 metadata,
952 Column("id", Integer, primary_key=True),
953 Column("data", String(50)),
954 Column(
955 "last_updated",
956 DateTime,
957 server_default=text(
958 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
959 ),
960 ),
961 )
962
963Even though the :paramref:`_schema.Column.server_onupdate` feature does not
964generate this DDL, it still may be desirable to signal to the ORM that this
965updated value should be fetched. This syntax looks like the following::
966
967 from sqlalchemy.schema import FetchedValue
968
969
970 class MyClass(Base):
971 __tablename__ = "mytable"
972
973 id = Column(Integer, primary_key=True)
974 data = Column(String(50))
975 last_updated = Column(
976 TIMESTAMP,
977 server_default=text(
978 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
979 ),
980 server_onupdate=FetchedValue(),
981 )
982
983.. _mysql_timestamp_null:
984
985TIMESTAMP Columns and NULL
986~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
987
988MySQL historically enforces that a column which specifies the
989TIMESTAMP datatype implicitly includes a default value of
990CURRENT_TIMESTAMP, even though this is not stated, and additionally
991sets the column as NOT NULL, the opposite behavior vs. that of all
992other datatypes:
993
994.. sourcecode:: text
995
996 mysql> CREATE TABLE ts_test (
997 -> a INTEGER,
998 -> b INTEGER NOT NULL,
999 -> c TIMESTAMP,
1000 -> d TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1001 -> e TIMESTAMP NULL);
1002 Query OK, 0 rows affected (0.03 sec)
1003
1004 mysql> SHOW CREATE TABLE ts_test;
1005 +---------+-----------------------------------------------------
1006 | Table | Create Table
1007 +---------+-----------------------------------------------------
1008 | ts_test | CREATE TABLE `ts_test` (
1009 `a` int(11) DEFAULT NULL,
1010 `b` int(11) NOT NULL,
1011 `c` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
1012 `d` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
1013 `e` timestamp NULL DEFAULT NULL
1014 ) ENGINE=MyISAM DEFAULT CHARSET=latin1
1015
1016Above, we see that an INTEGER column defaults to NULL, unless it is specified
1017with NOT NULL. But when the column is of type TIMESTAMP, an implicit
1018default of CURRENT_TIMESTAMP is generated which also coerces the column
1019to be a NOT NULL, even though we did not specify it as such.
1020
1021This behavior of MySQL can be changed on the MySQL side using the
1022`explicit_defaults_for_timestamp
1023<https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html
1024#sysvar_explicit_defaults_for_timestamp>`_ configuration flag introduced in
1025MySQL 5.6. With this server setting enabled, TIMESTAMP columns behave like
1026any other datatype on the MySQL side with regards to defaults and nullability.
1027
1028However, to accommodate the vast majority of MySQL databases that do not
1029specify this new flag, SQLAlchemy emits the "NULL" specifier explicitly with
1030any TIMESTAMP column that does not specify ``nullable=False``. In order to
1031accommodate newer databases that specify ``explicit_defaults_for_timestamp``,
1032SQLAlchemy also emits NOT NULL for TIMESTAMP columns that do specify
1033``nullable=False``. The following example illustrates::
1034
1035 from sqlalchemy import MetaData, Integer, Table, Column, text
1036 from sqlalchemy.dialects.mysql import TIMESTAMP
1037
1038 m = MetaData()
1039 t = Table(
1040 "ts_test",
1041 m,
1042 Column("a", Integer),
1043 Column("b", Integer, nullable=False),
1044 Column("c", TIMESTAMP),
1045 Column("d", TIMESTAMP, nullable=False),
1046 )
1047
1048
1049 from sqlalchemy import create_engine
1050
1051 e = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo=True)
1052 m.create_all(e)
1053
1054output:
1055
1056.. sourcecode:: sql
1057
1058 CREATE TABLE ts_test (
1059 a INTEGER,
1060 b INTEGER NOT NULL,
1061 c TIMESTAMP NULL,
1062 d TIMESTAMP NOT NULL
1063 )
1064
1065""" # noqa
1066from __future__ import annotations
1067
1068from collections import defaultdict
1069from itertools import compress
1070import re
1071from typing import Any
1072from typing import Callable
1073from typing import cast
1074from typing import DefaultDict
1075from typing import Dict
1076from typing import List
1077from typing import NoReturn
1078from typing import Optional
1079from typing import overload
1080from typing import Sequence
1081from typing import Tuple
1082from typing import TYPE_CHECKING
1083from typing import Union
1084
1085from . import reflection as _reflection
1086from .enumerated import ENUM
1087from .enumerated import SET
1088from .json import JSON
1089from .json import JSONIndexType
1090from .json import JSONPathType
1091from .reserved_words import RESERVED_WORDS_MARIADB
1092from .reserved_words import RESERVED_WORDS_MYSQL
1093from .types import _FloatType
1094from .types import _IntegerType
1095from .types import _MatchType
1096from .types import _NumericType
1097from .types import _StringType
1098from .types import BIGINT
1099from .types import BIT
1100from .types import CHAR
1101from .types import DATETIME
1102from .types import DECIMAL
1103from .types import DOUBLE
1104from .types import FLOAT
1105from .types import INTEGER
1106from .types import LONGBLOB
1107from .types import LONGTEXT
1108from .types import MEDIUMBLOB
1109from .types import MEDIUMINT
1110from .types import MEDIUMTEXT
1111from .types import NCHAR
1112from .types import NUMERIC
1113from .types import NVARCHAR
1114from .types import REAL
1115from .types import SMALLINT
1116from .types import TEXT
1117from .types import TIME
1118from .types import TIMESTAMP
1119from .types import TINYBLOB
1120from .types import TINYINT
1121from .types import TINYTEXT
1122from .types import VARCHAR
1123from .types import YEAR
1124from ... import exc
1125from ... import literal_column
1126from ... import schema as sa_schema
1127from ... import sql
1128from ... import util
1129from ...engine import cursor as _cursor
1130from ...engine import default
1131from ...engine import reflection
1132from ...engine.reflection import ReflectionDefaults
1133from ...sql import coercions
1134from ...sql import compiler
1135from ...sql import elements
1136from ...sql import functions
1137from ...sql import operators
1138from ...sql import roles
1139from ...sql import sqltypes
1140from ...sql import util as sql_util
1141from ...sql import visitors
1142from ...sql.compiler import InsertmanyvaluesSentinelOpts
1143from ...sql.compiler import SQLCompiler
1144from ...sql.schema import SchemaConst
1145from ...types import BINARY
1146from ...types import BLOB
1147from ...types import BOOLEAN
1148from ...types import DATE
1149from ...types import LargeBinary
1150from ...types import UUID
1151from ...types import VARBINARY
1152from ...util import topological
1153
1154if TYPE_CHECKING:
1155
1156 from ...dialects.mysql import expression
1157 from ...dialects.mysql.dml import OnDuplicateClause
1158 from ...engine.base import Connection
1159 from ...engine.cursor import CursorResult
1160 from ...engine.interfaces import DBAPIConnection
1161 from ...engine.interfaces import DBAPICursor
1162 from ...engine.interfaces import DBAPIModule
1163 from ...engine.interfaces import IsolationLevel
1164 from ...engine.interfaces import PoolProxiedConnection
1165 from ...engine.interfaces import ReflectedCheckConstraint
1166 from ...engine.interfaces import ReflectedColumn
1167 from ...engine.interfaces import ReflectedForeignKeyConstraint
1168 from ...engine.interfaces import ReflectedIndex
1169 from ...engine.interfaces import ReflectedPrimaryKeyConstraint
1170 from ...engine.interfaces import ReflectedTableComment
1171 from ...engine.interfaces import ReflectedUniqueConstraint
1172 from ...engine.row import Row
1173 from ...engine.url import URL
1174 from ...schema import Table
1175 from ...sql import ddl
1176 from ...sql import selectable
1177 from ...sql.dml import _DMLTableElement
1178 from ...sql.dml import Delete
1179 from ...sql.dml import Update
1180 from ...sql.dml import ValuesBase
1181 from ...sql.functions import aggregate_strings
1182 from ...sql.functions import random
1183 from ...sql.functions import rollup
1184 from ...sql.functions import sysdate
1185 from ...sql.schema import Sequence as Sequence_SchemaItem
1186 from ...sql.type_api import TypeEngine
1187 from ...sql.visitors import ExternallyTraversible
1188
1189
1190SET_RE = re.compile(
1191 r"\s*SET\s+(?:(?:GLOBAL|SESSION)\s+)?\w", re.I | re.UNICODE
1192)
1193
1194# old names
1195MSTime = TIME
1196MSSet = SET
1197MSEnum = ENUM
1198MSLongBlob = LONGBLOB
1199MSMediumBlob = MEDIUMBLOB
1200MSTinyBlob = TINYBLOB
1201MSBlob = BLOB
1202MSBinary = BINARY
1203MSVarBinary = VARBINARY
1204MSNChar = NCHAR
1205MSNVarChar = NVARCHAR
1206MSChar = CHAR
1207MSString = VARCHAR
1208MSLongText = LONGTEXT
1209MSMediumText = MEDIUMTEXT
1210MSTinyText = TINYTEXT
1211MSText = TEXT
1212MSYear = YEAR
1213MSTimeStamp = TIMESTAMP
1214MSBit = BIT
1215MSSmallInteger = SMALLINT
1216MSTinyInteger = TINYINT
1217MSMediumInteger = MEDIUMINT
1218MSBigInteger = BIGINT
1219MSNumeric = NUMERIC
1220MSDecimal = DECIMAL
1221MSDouble = DOUBLE
1222MSReal = REAL
1223MSFloat = FLOAT
1224MSInteger = INTEGER
1225
1226colspecs = {
1227 _IntegerType: _IntegerType,
1228 _NumericType: _NumericType,
1229 _FloatType: _FloatType,
1230 sqltypes.Numeric: NUMERIC,
1231 sqltypes.Float: FLOAT,
1232 sqltypes.Double: DOUBLE,
1233 sqltypes.Time: TIME,
1234 sqltypes.Enum: ENUM,
1235 sqltypes.MatchType: _MatchType,
1236 sqltypes.JSON: JSON,
1237 sqltypes.JSON.JSONIndexType: JSONIndexType,
1238 sqltypes.JSON.JSONPathType: JSONPathType,
1239}
1240
1241# Everything 3.23 through 5.1 excepting OpenGIS types.
1242ischema_names = {
1243 "bigint": BIGINT,
1244 "binary": BINARY,
1245 "bit": BIT,
1246 "blob": BLOB,
1247 "boolean": BOOLEAN,
1248 "char": CHAR,
1249 "date": DATE,
1250 "datetime": DATETIME,
1251 "decimal": DECIMAL,
1252 "double": DOUBLE,
1253 "enum": ENUM,
1254 "fixed": DECIMAL,
1255 "float": FLOAT,
1256 "int": INTEGER,
1257 "integer": INTEGER,
1258 "json": JSON,
1259 "longblob": LONGBLOB,
1260 "longtext": LONGTEXT,
1261 "mediumblob": MEDIUMBLOB,
1262 "mediumint": MEDIUMINT,
1263 "mediumtext": MEDIUMTEXT,
1264 "nchar": NCHAR,
1265 "nvarchar": NVARCHAR,
1266 "numeric": NUMERIC,
1267 "set": SET,
1268 "smallint": SMALLINT,
1269 "text": TEXT,
1270 "time": TIME,
1271 "timestamp": TIMESTAMP,
1272 "tinyblob": TINYBLOB,
1273 "tinyint": TINYINT,
1274 "tinytext": TINYTEXT,
1275 "uuid": UUID,
1276 "varbinary": VARBINARY,
1277 "varchar": VARCHAR,
1278 "year": YEAR,
1279}
1280
1281
1282class MySQLExecutionContext(default.DefaultExecutionContext):
1283 def post_exec(self) -> None:
1284 if (
1285 self.isdelete
1286 and cast(SQLCompiler, self.compiled).effective_returning
1287 and not self.cursor.description
1288 ):
1289 # All MySQL/mariadb drivers appear to not include
1290 # cursor.description for DELETE..RETURNING with no rows if the
1291 # WHERE criteria is a straight "false" condition such as our EMPTY
1292 # IN condition. manufacture an empty result in this case (issue
1293 # #10505)
1294 #
1295 # taken from cx_Oracle implementation
1296 self.cursor_fetch_strategy = (
1297 _cursor.FullyBufferedCursorFetchStrategy(
1298 self.cursor,
1299 [
1300 (entry.keyname, None) # type: ignore[misc]
1301 for entry in cast(
1302 SQLCompiler, self.compiled
1303 )._result_columns
1304 ],
1305 [],
1306 )
1307 )
1308
1309 def create_server_side_cursor(self) -> DBAPICursor:
1310 if self.dialect.supports_server_side_cursors:
1311 return self._dbapi_connection.cursor(
1312 self.dialect._sscursor # type: ignore[attr-defined]
1313 )
1314 else:
1315 raise NotImplementedError()
1316
1317 def fire_sequence(
1318 self, seq: Sequence_SchemaItem, type_: sqltypes.Integer
1319 ) -> int:
1320 return self._execute_scalar( # type: ignore[no-any-return]
1321 (
1322 "select nextval(%s)"
1323 % self.identifier_preparer.format_sequence(seq)
1324 ),
1325 type_,
1326 )
1327
1328
1329class MySQLCompiler(compiler.SQLCompiler):
1330 dialect: MySQLDialect
1331 render_table_with_column_in_update_from = True
1332 """Overridden from base SQLCompiler value"""
1333
1334 extract_map = compiler.SQLCompiler.extract_map.copy()
1335 extract_map.update({"milliseconds": "millisecond"})
1336
1337 def default_from(self) -> str:
1338 """Called when a ``SELECT`` statement has no froms,
1339 and no ``FROM`` clause is to be appended.
1340
1341 """
1342 if self.stack:
1343 stmt = self.stack[-1]["selectable"]
1344 if stmt._where_criteria: # type: ignore[attr-defined]
1345 return " FROM DUAL"
1346
1347 return ""
1348
1349 def visit_random_func(self, fn: random, **kw: Any) -> str:
1350 return "rand%s" % self.function_argspec(fn)
1351
1352 def visit_rollup_func(self, fn: rollup[Any], **kw: Any) -> str:
1353 clause = ", ".join(
1354 elem._compiler_dispatch(self, **kw) for elem in fn.clauses
1355 )
1356 return f"{clause} WITH ROLLUP"
1357
1358 def visit_aggregate_strings_func(
1359 self, fn: aggregate_strings, **kw: Any
1360 ) -> str:
1361 expr, delimeter = (
1362 elem._compiler_dispatch(self, **kw) for elem in fn.clauses
1363 )
1364 return f"group_concat({expr} SEPARATOR {delimeter})"
1365
1366 def visit_sequence(self, sequence: sa_schema.Sequence, **kw: Any) -> str:
1367 return "nextval(%s)" % self.preparer.format_sequence(sequence)
1368
1369 def visit_sysdate_func(self, fn: sysdate, **kw: Any) -> str:
1370 return "SYSDATE()"
1371
1372 def _render_json_extract_from_binary(
1373 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1374 ) -> str:
1375 # note we are intentionally calling upon the process() calls in the
1376 # order in which they appear in the SQL String as this is used
1377 # by positional parameter rendering
1378
1379 if binary.type._type_affinity is sqltypes.JSON:
1380 return "JSON_EXTRACT(%s, %s)" % (
1381 self.process(binary.left, **kw),
1382 self.process(binary.right, **kw),
1383 )
1384
1385 # for non-JSON, MySQL doesn't handle JSON null at all so it has to
1386 # be explicit
1387 case_expression = "CASE JSON_EXTRACT(%s, %s) WHEN 'null' THEN NULL" % (
1388 self.process(binary.left, **kw),
1389 self.process(binary.right, **kw),
1390 )
1391
1392 if binary.type._type_affinity is sqltypes.Integer:
1393 type_expression = (
1394 "ELSE CAST(JSON_EXTRACT(%s, %s) AS SIGNED INTEGER)"
1395 % (
1396 self.process(binary.left, **kw),
1397 self.process(binary.right, **kw),
1398 )
1399 )
1400 elif binary.type._type_affinity is sqltypes.Numeric:
1401 binary_type = cast(sqltypes.Numeric[Any], binary.type)
1402 if (
1403 binary_type.scale is not None
1404 and binary_type.precision is not None
1405 ):
1406 # using DECIMAL here because MySQL does not recognize NUMERIC
1407 type_expression = (
1408 "ELSE CAST(JSON_EXTRACT(%s, %s) AS DECIMAL(%s, %s))"
1409 % (
1410 self.process(binary.left, **kw),
1411 self.process(binary.right, **kw),
1412 binary_type.precision,
1413 binary_type.scale,
1414 )
1415 )
1416 else:
1417 # FLOAT / REAL not added in MySQL til 8.0.17
1418 type_expression = (
1419 "ELSE JSON_EXTRACT(%s, %s)+0.0000000000000000000000"
1420 % (
1421 self.process(binary.left, **kw),
1422 self.process(binary.right, **kw),
1423 )
1424 )
1425 elif binary.type._type_affinity is sqltypes.Boolean:
1426 # the NULL handling is particularly weird with boolean, so
1427 # explicitly return true/false constants
1428 type_expression = "WHEN true THEN true ELSE false"
1429 elif binary.type._type_affinity is sqltypes.String:
1430 # (gord): this fails with a JSON value that's a four byte unicode
1431 # string. SQLite has the same problem at the moment
1432 # (zzzeek): I'm not really sure. let's take a look at a test case
1433 # that hits each backend and maybe make a requires rule for it?
1434 type_expression = "ELSE JSON_UNQUOTE(JSON_EXTRACT(%s, %s))" % (
1435 self.process(binary.left, **kw),
1436 self.process(binary.right, **kw),
1437 )
1438 else:
1439 # other affinity....this is not expected right now
1440 type_expression = "ELSE JSON_EXTRACT(%s, %s)" % (
1441 self.process(binary.left, **kw),
1442 self.process(binary.right, **kw),
1443 )
1444
1445 return case_expression + " " + type_expression + " END"
1446
1447 def visit_json_getitem_op_binary(
1448 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1449 ) -> str:
1450 return self._render_json_extract_from_binary(binary, operator, **kw)
1451
1452 def visit_json_path_getitem_op_binary(
1453 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1454 ) -> str:
1455 return self._render_json_extract_from_binary(binary, operator, **kw)
1456
1457 def visit_on_duplicate_key_update(
1458 self, on_duplicate: OnDuplicateClause, **kw: Any
1459 ) -> str:
1460 statement: ValuesBase = self.current_executable
1461
1462 cols: List[elements.KeyedColumnElement[Any]]
1463 if on_duplicate._parameter_ordering:
1464 parameter_ordering = [
1465 coercions.expect(roles.DMLColumnRole, key)
1466 for key in on_duplicate._parameter_ordering
1467 ]
1468 ordered_keys = set(parameter_ordering)
1469 cols = [
1470 statement.table.c[key]
1471 for key in parameter_ordering
1472 if key in statement.table.c
1473 ] + [c for c in statement.table.c if c.key not in ordered_keys]
1474 else:
1475 cols = list(statement.table.c)
1476
1477 clauses = []
1478
1479 requires_mysql8_alias = statement.select is None and (
1480 self.dialect._requires_alias_for_on_duplicate_key
1481 )
1482
1483 if requires_mysql8_alias:
1484 if statement.table.name.lower() == "new": # type: ignore[union-attr] # noqa: E501
1485 _on_dup_alias_name = "new_1"
1486 else:
1487 _on_dup_alias_name = "new"
1488
1489 on_duplicate_update = {
1490 coercions.expect_as_key(roles.DMLColumnRole, key): value
1491 for key, value in on_duplicate.update.items()
1492 }
1493
1494 # traverses through all table columns to preserve table column order
1495 for column in (col for col in cols if col.key in on_duplicate_update):
1496 val = on_duplicate_update[column.key]
1497
1498 # TODO: this coercion should be up front. we can't cache
1499 # SQL constructs with non-bound literals buried in them
1500 if coercions._is_literal(val):
1501 val = elements.BindParameter(None, val, type_=column.type)
1502 value_text = self.process(val.self_group(), use_schema=False)
1503 else:
1504
1505 def replace(
1506 element: ExternallyTraversible, **kw: Any
1507 ) -> Optional[ExternallyTraversible]:
1508 if (
1509 isinstance(element, elements.BindParameter)
1510 and element.type._isnull
1511 ):
1512 return element._with_binary_element_type(column.type)
1513 elif (
1514 isinstance(element, elements.ColumnClause)
1515 and element.table is on_duplicate.inserted_alias
1516 ):
1517 if requires_mysql8_alias:
1518 column_literal_clause = (
1519 f"{_on_dup_alias_name}."
1520 f"{self.preparer.quote(element.name)}"
1521 )
1522 else:
1523 column_literal_clause = (
1524 f"VALUES({self.preparer.quote(element.name)})"
1525 )
1526 return literal_column(column_literal_clause)
1527 else:
1528 # element is not replaced
1529 return None
1530
1531 val = visitors.replacement_traverse(val, {}, replace)
1532 value_text = self.process(val.self_group(), use_schema=False)
1533
1534 name_text = self.preparer.quote(column.name)
1535 clauses.append("%s = %s" % (name_text, value_text))
1536
1537 non_matching = set(on_duplicate_update) - {c.key for c in cols}
1538 if non_matching:
1539 util.warn(
1540 "Additional column names not matching "
1541 "any column keys in table '%s': %s"
1542 % (
1543 self.statement.table.name, # type: ignore[union-attr]
1544 (", ".join("'%s'" % c for c in non_matching)),
1545 )
1546 )
1547
1548 if requires_mysql8_alias:
1549 return (
1550 f"AS {_on_dup_alias_name} "
1551 f"ON DUPLICATE KEY UPDATE {', '.join(clauses)}"
1552 )
1553 else:
1554 return f"ON DUPLICATE KEY UPDATE {', '.join(clauses)}"
1555
1556 def visit_concat_op_expression_clauselist(
1557 self, clauselist: elements.ClauseList, operator: Any, **kw: Any
1558 ) -> str:
1559 return "concat(%s)" % (
1560 ", ".join(self.process(elem, **kw) for elem in clauselist.clauses)
1561 )
1562
1563 def visit_concat_op_binary(
1564 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1565 ) -> str:
1566 return "concat(%s, %s)" % (
1567 self.process(binary.left, **kw),
1568 self.process(binary.right, **kw),
1569 )
1570
1571 _match_valid_flag_combinations = frozenset(
1572 (
1573 # (boolean_mode, natural_language, query_expansion)
1574 (False, False, False),
1575 (True, False, False),
1576 (False, True, False),
1577 (False, False, True),
1578 (False, True, True),
1579 )
1580 )
1581
1582 _match_flag_expressions = (
1583 "IN BOOLEAN MODE",
1584 "IN NATURAL LANGUAGE MODE",
1585 "WITH QUERY EXPANSION",
1586 )
1587
1588 def visit_mysql_match(self, element: expression.match, **kw: Any) -> str:
1589 return self.visit_match_op_binary(element, element.operator, **kw)
1590
1591 def visit_match_op_binary(
1592 self, binary: expression.match, operator: Any, **kw: Any
1593 ) -> str:
1594 """
1595 Note that `mysql_boolean_mode` is enabled by default because of
1596 backward compatibility
1597 """
1598
1599 modifiers = binary.modifiers
1600
1601 boolean_mode = modifiers.get("mysql_boolean_mode", True)
1602 natural_language = modifiers.get("mysql_natural_language", False)
1603 query_expansion = modifiers.get("mysql_query_expansion", False)
1604
1605 flag_combination = (boolean_mode, natural_language, query_expansion)
1606
1607 if flag_combination not in self._match_valid_flag_combinations:
1608 flags = (
1609 "in_boolean_mode=%s" % boolean_mode,
1610 "in_natural_language_mode=%s" % natural_language,
1611 "with_query_expansion=%s" % query_expansion,
1612 )
1613
1614 flags_str = ", ".join(flags)
1615
1616 raise exc.CompileError("Invalid MySQL match flags: %s" % flags_str)
1617
1618 match_clause = self.process(binary.left, **kw)
1619 against_clause = self.process(binary.right, **kw)
1620
1621 if any(flag_combination):
1622 flag_expressions = compress(
1623 self._match_flag_expressions,
1624 flag_combination,
1625 )
1626
1627 against_clause = " ".join([against_clause, *flag_expressions])
1628
1629 return "MATCH (%s) AGAINST (%s)" % (match_clause, against_clause)
1630
1631 def get_from_hint_text(
1632 self, table: selectable.FromClause, text: Optional[str]
1633 ) -> Optional[str]:
1634 return text
1635
1636 def visit_typeclause(
1637 self,
1638 typeclause: elements.TypeClause,
1639 type_: Optional[TypeEngine[Any]] = None,
1640 **kw: Any,
1641 ) -> Optional[str]:
1642 if type_ is None:
1643 type_ = typeclause.type.dialect_impl(self.dialect)
1644 if isinstance(type_, sqltypes.TypeDecorator):
1645 return self.visit_typeclause(typeclause, type_.impl, **kw) # type: ignore[arg-type] # noqa: E501
1646 elif isinstance(type_, sqltypes.Integer):
1647 if getattr(type_, "unsigned", False):
1648 return "UNSIGNED INTEGER"
1649 else:
1650 return "SIGNED INTEGER"
1651 elif isinstance(type_, sqltypes.TIMESTAMP):
1652 return "DATETIME"
1653 elif isinstance(
1654 type_,
1655 (
1656 sqltypes.DECIMAL,
1657 sqltypes.DateTime,
1658 sqltypes.Date,
1659 sqltypes.Time,
1660 ),
1661 ):
1662 return self.dialect.type_compiler_instance.process(type_)
1663 elif isinstance(type_, sqltypes.String) and not isinstance(
1664 type_, (ENUM, SET)
1665 ):
1666 adapted = CHAR._adapt_string_for_cast(type_)
1667 return self.dialect.type_compiler_instance.process(adapted)
1668 elif isinstance(type_, sqltypes._Binary):
1669 return "BINARY"
1670 elif isinstance(type_, sqltypes.JSON):
1671 return "JSON"
1672 elif isinstance(type_, sqltypes.NUMERIC):
1673 return self.dialect.type_compiler_instance.process(type_).replace(
1674 "NUMERIC", "DECIMAL"
1675 )
1676 elif (
1677 isinstance(type_, sqltypes.Float)
1678 and self.dialect._support_float_cast
1679 ):
1680 return self.dialect.type_compiler_instance.process(type_)
1681 else:
1682 return None
1683
1684 def visit_cast(self, cast: elements.Cast[Any], **kw: Any) -> str:
1685 type_ = self.process(cast.typeclause)
1686 if type_ is None:
1687 util.warn(
1688 "Datatype %s does not support CAST on MySQL/MariaDb; "
1689 "the CAST will be skipped."
1690 % self.dialect.type_compiler_instance.process(
1691 cast.typeclause.type
1692 )
1693 )
1694 return self.process(cast.clause.self_group(), **kw)
1695
1696 return "CAST(%s AS %s)" % (self.process(cast.clause, **kw), type_)
1697
1698 def render_literal_value(
1699 self, value: Optional[str], type_: TypeEngine[Any]
1700 ) -> str:
1701 value = super().render_literal_value(value, type_)
1702 if self.dialect._backslash_escapes:
1703 value = value.replace("\\", "\\\\")
1704 return value
1705
1706 # override native_boolean=False behavior here, as
1707 # MySQL still supports native boolean
1708 def visit_true(self, expr: elements.True_, **kw: Any) -> str:
1709 return "true"
1710
1711 def visit_false(self, expr: elements.False_, **kw: Any) -> str:
1712 return "false"
1713
1714 def get_select_precolumns(
1715 self, select: selectable.Select[Any], **kw: Any
1716 ) -> str:
1717 """Add special MySQL keywords in place of DISTINCT.
1718
1719 .. deprecated:: 1.4 This usage is deprecated.
1720 :meth:`_expression.Select.prefix_with` should be used for special
1721 keywords at the start of a SELECT.
1722
1723 """
1724 if isinstance(select._distinct, str):
1725 util.warn_deprecated(
1726 "Sending string values for 'distinct' is deprecated in the "
1727 "MySQL dialect and will be removed in a future release. "
1728 "Please use :meth:`.Select.prefix_with` for special keywords "
1729 "at the start of a SELECT statement",
1730 version="1.4",
1731 )
1732 return select._distinct.upper() + " "
1733
1734 return super().get_select_precolumns(select, **kw)
1735
1736 def visit_join(
1737 self,
1738 join: selectable.Join,
1739 asfrom: bool = False,
1740 from_linter: Optional[compiler.FromLinter] = None,
1741 **kwargs: Any,
1742 ) -> str:
1743 if from_linter:
1744 from_linter.edges.add((join.left, join.right))
1745
1746 if join.full:
1747 join_type = " FULL OUTER JOIN "
1748 elif join.isouter:
1749 join_type = " LEFT OUTER JOIN "
1750 else:
1751 join_type = " INNER JOIN "
1752
1753 return "".join(
1754 (
1755 self.process(
1756 join.left, asfrom=True, from_linter=from_linter, **kwargs
1757 ),
1758 join_type,
1759 self.process(
1760 join.right, asfrom=True, from_linter=from_linter, **kwargs
1761 ),
1762 " ON ",
1763 self.process(join.onclause, from_linter=from_linter, **kwargs), # type: ignore[arg-type] # noqa: E501
1764 )
1765 )
1766
1767 def for_update_clause(
1768 self, select: selectable.GenerativeSelect, **kw: Any
1769 ) -> str:
1770 assert select._for_update_arg is not None
1771 if select._for_update_arg.read:
1772 if self.dialect.use_mysql_for_share:
1773 tmp = " FOR SHARE"
1774 else:
1775 tmp = " LOCK IN SHARE MODE"
1776 else:
1777 tmp = " FOR UPDATE"
1778
1779 if select._for_update_arg.of and self.dialect.supports_for_update_of:
1780 tables: util.OrderedSet[elements.ClauseElement] = util.OrderedSet()
1781 for c in select._for_update_arg.of:
1782 tables.update(sql_util.surface_selectables_only(c))
1783
1784 tmp += " OF " + ", ".join(
1785 self.process(table, ashint=True, use_schema=False, **kw)
1786 for table in tables
1787 )
1788
1789 if select._for_update_arg.nowait:
1790 tmp += " NOWAIT"
1791
1792 if select._for_update_arg.skip_locked:
1793 tmp += " SKIP LOCKED"
1794
1795 return tmp
1796
1797 def limit_clause(
1798 self, select: selectable.GenerativeSelect, **kw: Any
1799 ) -> str:
1800 # MySQL supports:
1801 # LIMIT <limit>
1802 # LIMIT <offset>, <limit>
1803 # and in server versions > 3.3:
1804 # LIMIT <limit> OFFSET <offset>
1805 # The latter is more readable for offsets but we're stuck with the
1806 # former until we can refine dialects by server revision.
1807
1808 limit_clause, offset_clause = (
1809 select._limit_clause,
1810 select._offset_clause,
1811 )
1812
1813 if limit_clause is None and offset_clause is None:
1814 return ""
1815 elif offset_clause is not None:
1816 # As suggested by the MySQL docs, need to apply an
1817 # artificial limit if one wasn't provided
1818 # https://dev.mysql.com/doc/refman/5.0/en/select.html
1819 if limit_clause is None:
1820 # TODO: remove ??
1821 # hardwire the upper limit. Currently
1822 # needed consistent with the usage of the upper
1823 # bound as part of MySQL's "syntax" for OFFSET with
1824 # no LIMIT.
1825 return " \n LIMIT %s, %s" % (
1826 self.process(offset_clause, **kw),
1827 "18446744073709551615",
1828 )
1829 else:
1830 return " \n LIMIT %s, %s" % (
1831 self.process(offset_clause, **kw),
1832 self.process(limit_clause, **kw),
1833 )
1834 else:
1835 assert limit_clause is not None
1836 # No offset provided, so just use the limit
1837 return " \n LIMIT %s" % (self.process(limit_clause, **kw),)
1838
1839 def update_limit_clause(self, update_stmt: Update) -> Optional[str]:
1840 limit = update_stmt.kwargs.get("%s_limit" % self.dialect.name, None)
1841 if limit is not None:
1842 return f"LIMIT {int(limit)}"
1843 else:
1844 return None
1845
1846 def delete_limit_clause(self, delete_stmt: Delete) -> Optional[str]:
1847 limit = delete_stmt.kwargs.get("%s_limit" % self.dialect.name, None)
1848 if limit is not None:
1849 return f"LIMIT {int(limit)}"
1850 else:
1851 return None
1852
1853 def update_tables_clause(
1854 self,
1855 update_stmt: Update,
1856 from_table: _DMLTableElement,
1857 extra_froms: List[selectable.FromClause],
1858 **kw: Any,
1859 ) -> str:
1860 kw["asfrom"] = True
1861 return ", ".join(
1862 t._compiler_dispatch(self, **kw)
1863 for t in [from_table] + list(extra_froms)
1864 )
1865
1866 def update_from_clause(
1867 self,
1868 update_stmt: Update,
1869 from_table: _DMLTableElement,
1870 extra_froms: List[selectable.FromClause],
1871 from_hints: Any,
1872 **kw: Any,
1873 ) -> None:
1874 return None
1875
1876 def delete_table_clause(
1877 self,
1878 delete_stmt: Delete,
1879 from_table: _DMLTableElement,
1880 extra_froms: List[selectable.FromClause],
1881 **kw: Any,
1882 ) -> str:
1883 """If we have extra froms make sure we render any alias as hint."""
1884 ashint = False
1885 if extra_froms:
1886 ashint = True
1887 return from_table._compiler_dispatch(
1888 self, asfrom=True, iscrud=True, ashint=ashint, **kw
1889 )
1890
1891 def delete_extra_from_clause(
1892 self,
1893 delete_stmt: Delete,
1894 from_table: _DMLTableElement,
1895 extra_froms: List[selectable.FromClause],
1896 from_hints: Any,
1897 **kw: Any,
1898 ) -> str:
1899 """Render the DELETE .. USING clause specific to MySQL."""
1900 kw["asfrom"] = True
1901 return "USING " + ", ".join(
1902 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1903 for t in [from_table] + extra_froms
1904 )
1905
1906 def visit_empty_set_expr(
1907 self, element_types: List[TypeEngine[Any]], **kw: Any
1908 ) -> str:
1909 return (
1910 "SELECT %(outer)s FROM (SELECT %(inner)s) "
1911 "as _empty_set WHERE 1!=1"
1912 % {
1913 "inner": ", ".join(
1914 "1 AS _in_%s" % idx
1915 for idx, type_ in enumerate(element_types)
1916 ),
1917 "outer": ", ".join(
1918 "_in_%s" % idx for idx, type_ in enumerate(element_types)
1919 ),
1920 }
1921 )
1922
1923 def visit_is_distinct_from_binary(
1924 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1925 ) -> str:
1926 return "NOT (%s <=> %s)" % (
1927 self.process(binary.left),
1928 self.process(binary.right),
1929 )
1930
1931 def visit_is_not_distinct_from_binary(
1932 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1933 ) -> str:
1934 return "%s <=> %s" % (
1935 self.process(binary.left),
1936 self.process(binary.right),
1937 )
1938
1939 def _mariadb_regexp_flags(
1940 self, flags: str, pattern: elements.ColumnElement[Any], **kw: Any
1941 ) -> str:
1942 return "CONCAT('(?', %s, ')', %s)" % (
1943 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1944 self.process(pattern, **kw),
1945 )
1946
1947 def _regexp_match(
1948 self,
1949 op_string: str,
1950 binary: elements.BinaryExpression[Any],
1951 operator: Any,
1952 **kw: Any,
1953 ) -> str:
1954 assert binary.modifiers is not None
1955 flags = binary.modifiers["flags"]
1956 if flags is None:
1957 return self._generate_generic_binary(binary, op_string, **kw)
1958 elif self.dialect.is_mariadb:
1959 return "%s%s%s" % (
1960 self.process(binary.left, **kw),
1961 op_string,
1962 self._mariadb_regexp_flags(flags, binary.right),
1963 )
1964 else:
1965 text = "REGEXP_LIKE(%s, %s, %s)" % (
1966 self.process(binary.left, **kw),
1967 self.process(binary.right, **kw),
1968 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1969 )
1970 if op_string == " NOT REGEXP ":
1971 return "NOT %s" % text
1972 else:
1973 return text
1974
1975 def visit_regexp_match_op_binary(
1976 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1977 ) -> str:
1978 return self._regexp_match(" REGEXP ", binary, operator, **kw)
1979
1980 def visit_not_regexp_match_op_binary(
1981 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1982 ) -> str:
1983 return self._regexp_match(" NOT REGEXP ", binary, operator, **kw)
1984
1985 def visit_regexp_replace_op_binary(
1986 self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
1987 ) -> str:
1988 assert binary.modifiers is not None
1989 flags = binary.modifiers["flags"]
1990 if flags is None:
1991 return "REGEXP_REPLACE(%s, %s)" % (
1992 self.process(binary.left, **kw),
1993 self.process(binary.right, **kw),
1994 )
1995 elif self.dialect.is_mariadb:
1996 return "REGEXP_REPLACE(%s, %s, %s)" % (
1997 self.process(binary.left, **kw),
1998 self._mariadb_regexp_flags(flags, binary.right.clauses[0]),
1999 self.process(binary.right.clauses[1], **kw),
2000 )
2001 else:
2002 return "REGEXP_REPLACE(%s, %s, %s)" % (
2003 self.process(binary.left, **kw),
2004 self.process(binary.right, **kw),
2005 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2006 )
2007
2008
2009class MySQLDDLCompiler(compiler.DDLCompiler):
2010 dialect: MySQLDialect
2011
2012 def get_column_specification(
2013 self, column: sa_schema.Column[Any], **kw: Any
2014 ) -> str:
2015 """Builds column DDL."""
2016 if (
2017 self.dialect.is_mariadb is True
2018 and column.computed is not None
2019 and column._user_defined_nullable is SchemaConst.NULL_UNSPECIFIED
2020 ):
2021 column.nullable = True
2022 colspec = [
2023 self.preparer.format_column(column),
2024 self.dialect.type_compiler_instance.process(
2025 column.type, type_expression=column
2026 ),
2027 ]
2028
2029 if column.computed is not None:
2030 colspec.append(self.process(column.computed))
2031
2032 is_timestamp = isinstance(
2033 column.type._unwrapped_dialect_impl(self.dialect),
2034 sqltypes.TIMESTAMP,
2035 )
2036
2037 if not column.nullable:
2038 colspec.append("NOT NULL")
2039
2040 # see: https://docs.sqlalchemy.org/en/latest/dialects/mysql.html#mysql_timestamp_null # noqa
2041 elif column.nullable and is_timestamp:
2042 colspec.append("NULL")
2043
2044 comment = column.comment
2045 if comment is not None:
2046 literal = self.sql_compiler.render_literal_value(
2047 comment, sqltypes.String()
2048 )
2049 colspec.append("COMMENT " + literal)
2050
2051 if (
2052 column.table is not None
2053 and column is column.table._autoincrement_column
2054 and (
2055 column.server_default is None
2056 or isinstance(column.server_default, sa_schema.Identity)
2057 )
2058 and not (
2059 self.dialect.supports_sequences
2060 and isinstance(column.default, sa_schema.Sequence)
2061 and not column.default.optional
2062 )
2063 ):
2064 colspec.append("AUTO_INCREMENT")
2065 else:
2066 default = self.get_column_default_string(column)
2067
2068 if default is not None:
2069 if (
2070 self.dialect._support_default_function
2071 and not re.match(r"^\s*[\'\"\(]", default)
2072 and not re.search(r"ON +UPDATE", default, re.I)
2073 and not re.match(
2074 r"\bnow\(\d+\)|\bcurrent_timestamp\(\d+\)",
2075 default,
2076 re.I,
2077 )
2078 and re.match(r".*\W.*", default)
2079 ):
2080 colspec.append(f"DEFAULT ({default})")
2081 else:
2082 colspec.append("DEFAULT " + default)
2083 return " ".join(colspec)
2084
2085 def post_create_table(self, table: sa_schema.Table) -> str:
2086 """Build table-level CREATE options like ENGINE and COLLATE."""
2087
2088 table_opts = []
2089
2090 opts = {
2091 k[len(self.dialect.name) + 1 :].upper(): v
2092 for k, v in table.kwargs.items()
2093 if k.startswith("%s_" % self.dialect.name)
2094 }
2095
2096 if table.comment is not None:
2097 opts["COMMENT"] = table.comment
2098
2099 partition_options = [
2100 "PARTITION_BY",
2101 "PARTITIONS",
2102 "SUBPARTITIONS",
2103 "SUBPARTITION_BY",
2104 ]
2105
2106 nonpart_options = set(opts).difference(partition_options)
2107 part_options = set(opts).intersection(partition_options)
2108
2109 for opt in topological.sort(
2110 [
2111 ("DEFAULT_CHARSET", "COLLATE"),
2112 ("DEFAULT_CHARACTER_SET", "COLLATE"),
2113 ("CHARSET", "COLLATE"),
2114 ("CHARACTER_SET", "COLLATE"),
2115 ],
2116 nonpart_options,
2117 ):
2118 arg = opts[opt]
2119 if opt in _reflection._options_of_type_string:
2120 arg = self.sql_compiler.render_literal_value(
2121 arg, sqltypes.String()
2122 )
2123
2124 if opt in (
2125 "DATA_DIRECTORY",
2126 "INDEX_DIRECTORY",
2127 "DEFAULT_CHARACTER_SET",
2128 "CHARACTER_SET",
2129 "DEFAULT_CHARSET",
2130 "DEFAULT_COLLATE",
2131 ):
2132 opt = opt.replace("_", " ")
2133
2134 joiner = "="
2135 if opt in (
2136 "TABLESPACE",
2137 "DEFAULT CHARACTER SET",
2138 "CHARACTER SET",
2139 "COLLATE",
2140 ):
2141 joiner = " "
2142
2143 table_opts.append(joiner.join((opt, arg)))
2144
2145 for opt in topological.sort(
2146 [
2147 ("PARTITION_BY", "PARTITIONS"),
2148 ("PARTITION_BY", "SUBPARTITION_BY"),
2149 ("PARTITION_BY", "SUBPARTITIONS"),
2150 ("PARTITIONS", "SUBPARTITIONS"),
2151 ("PARTITIONS", "SUBPARTITION_BY"),
2152 ("SUBPARTITION_BY", "SUBPARTITIONS"),
2153 ],
2154 part_options,
2155 ):
2156 arg = opts[opt]
2157 if opt in _reflection._options_of_type_string:
2158 arg = self.sql_compiler.render_literal_value(
2159 arg, sqltypes.String()
2160 )
2161
2162 opt = opt.replace("_", " ")
2163 joiner = " "
2164
2165 table_opts.append(joiner.join((opt, arg)))
2166
2167 return " ".join(table_opts)
2168
2169 def visit_create_index(self, create: ddl.CreateIndex, **kw: Any) -> str: # type: ignore[override] # noqa: E501
2170 index = create.element
2171 self._verify_index_table(index)
2172 preparer = self.preparer
2173 table = preparer.format_table(index.table) # type: ignore[arg-type]
2174
2175 columns = [
2176 self.sql_compiler.process(
2177 (
2178 elements.Grouping(expr) # type: ignore[arg-type]
2179 if (
2180 isinstance(expr, elements.BinaryExpression)
2181 or (
2182 isinstance(expr, elements.UnaryExpression)
2183 and expr.modifier
2184 not in (operators.desc_op, operators.asc_op)
2185 )
2186 or isinstance(expr, functions.FunctionElement)
2187 )
2188 else expr
2189 ),
2190 include_table=False,
2191 literal_binds=True,
2192 )
2193 for expr in index.expressions
2194 ]
2195
2196 name = self._prepared_index_name(index)
2197
2198 text = "CREATE "
2199 if index.unique:
2200 text += "UNIQUE "
2201
2202 index_prefix = index.kwargs.get("%s_prefix" % self.dialect.name, None)
2203 if index_prefix:
2204 text += index_prefix + " "
2205
2206 text += "INDEX "
2207 if create.if_not_exists:
2208 text += "IF NOT EXISTS "
2209 text += "%s ON %s " % (name, table)
2210
2211 length = index.dialect_options[self.dialect.name]["length"]
2212 if length is not None:
2213 if isinstance(length, dict):
2214 # length value can be a (column_name --> integer value)
2215 # mapping specifying the prefix length for each column of the
2216 # index
2217 columns_str = ", ".join(
2218 (
2219 "%s(%d)" % (expr, length[col.name]) # type: ignore[union-attr] # noqa: E501
2220 if col.name in length # type: ignore[union-attr]
2221 else (
2222 "%s(%d)" % (expr, length[expr])
2223 if expr in length
2224 else "%s" % expr
2225 )
2226 )
2227 for col, expr in zip(index.expressions, columns)
2228 )
2229 else:
2230 # or can be an integer value specifying the same
2231 # prefix length for all columns of the index
2232 columns_str = ", ".join(
2233 "%s(%d)" % (col, length) for col in columns
2234 )
2235 else:
2236 columns_str = ", ".join(columns)
2237 text += "(%s)" % columns_str
2238
2239 parser = index.dialect_options["mysql"]["with_parser"]
2240 if parser is not None:
2241 text += " WITH PARSER %s" % (parser,)
2242
2243 using = index.dialect_options["mysql"]["using"]
2244 if using is not None:
2245 text += " USING %s" % (preparer.quote(using))
2246
2247 return text
2248
2249 def visit_primary_key_constraint(
2250 self, constraint: sa_schema.PrimaryKeyConstraint, **kw: Any
2251 ) -> str:
2252 text = super().visit_primary_key_constraint(constraint)
2253 using = constraint.dialect_options["mysql"]["using"]
2254 if using:
2255 text += " USING %s" % (self.preparer.quote(using))
2256 return text
2257
2258 def visit_drop_index(self, drop: ddl.DropIndex, **kw: Any) -> str:
2259 index = drop.element
2260 text = "\nDROP INDEX "
2261 if drop.if_exists:
2262 text += "IF EXISTS "
2263
2264 return text + "%s ON %s" % (
2265 self._prepared_index_name(index, include_schema=False),
2266 self.preparer.format_table(index.table), # type: ignore[arg-type]
2267 )
2268
2269 def visit_drop_constraint(
2270 self, drop: ddl.DropConstraint, **kw: Any
2271 ) -> str:
2272 constraint = drop.element
2273 if isinstance(constraint, sa_schema.ForeignKeyConstraint):
2274 qual = "FOREIGN KEY "
2275 const = self.preparer.format_constraint(constraint)
2276 elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
2277 qual = "PRIMARY KEY "
2278 const = ""
2279 elif isinstance(constraint, sa_schema.UniqueConstraint):
2280 qual = "INDEX "
2281 const = self.preparer.format_constraint(constraint)
2282 elif isinstance(constraint, sa_schema.CheckConstraint):
2283 if self.dialect.is_mariadb:
2284 qual = "CONSTRAINT "
2285 else:
2286 qual = "CHECK "
2287 const = self.preparer.format_constraint(constraint)
2288 else:
2289 qual = ""
2290 const = self.preparer.format_constraint(constraint)
2291 return "ALTER TABLE %s DROP %s%s" % (
2292 self.preparer.format_table(constraint.table),
2293 qual,
2294 const,
2295 )
2296
2297 def define_constraint_match(
2298 self, constraint: sa_schema.ForeignKeyConstraint
2299 ) -> str:
2300 if constraint.match is not None:
2301 raise exc.CompileError(
2302 "MySQL ignores the 'MATCH' keyword while at the same time "
2303 "causes ON UPDATE/ON DELETE clauses to be ignored."
2304 )
2305 return ""
2306
2307 def visit_set_table_comment(
2308 self, create: ddl.SetTableComment, **kw: Any
2309 ) -> str:
2310 return "ALTER TABLE %s COMMENT %s" % (
2311 self.preparer.format_table(create.element),
2312 self.sql_compiler.render_literal_value(
2313 create.element.comment, sqltypes.String()
2314 ),
2315 )
2316
2317 def visit_drop_table_comment(
2318 self, drop: ddl.DropTableComment, **kw: Any
2319 ) -> str:
2320 return "ALTER TABLE %s COMMENT ''" % (
2321 self.preparer.format_table(drop.element)
2322 )
2323
2324 def visit_set_column_comment(
2325 self, create: ddl.SetColumnComment, **kw: Any
2326 ) -> str:
2327 return "ALTER TABLE %s CHANGE %s %s" % (
2328 self.preparer.format_table(create.element.table),
2329 self.preparer.format_column(create.element),
2330 self.get_column_specification(create.element),
2331 )
2332
2333
2334class MySQLTypeCompiler(compiler.GenericTypeCompiler):
2335 def _extend_numeric(self, type_: _NumericType, spec: str) -> str:
2336 "Extend a numeric-type declaration with MySQL specific extensions."
2337
2338 if not self._mysql_type(type_):
2339 return spec
2340
2341 if type_.unsigned:
2342 spec += " UNSIGNED"
2343 if type_.zerofill:
2344 spec += " ZEROFILL"
2345 return spec
2346
2347 def _extend_string(
2348 self, type_: _StringType, defaults: Dict[str, Any], spec: str
2349 ) -> str:
2350 """Extend a string-type declaration with standard SQL CHARACTER SET /
2351 COLLATE annotations and MySQL specific extensions.
2352
2353 """
2354
2355 def attr(name: str) -> Any:
2356 return getattr(type_, name, defaults.get(name))
2357
2358 if attr("charset"):
2359 charset = "CHARACTER SET %s" % attr("charset")
2360 elif attr("ascii"):
2361 charset = "ASCII"
2362 elif attr("unicode"):
2363 charset = "UNICODE"
2364 else:
2365
2366 charset = None
2367
2368 if attr("collation"):
2369 collation = "COLLATE %s" % type_.collation
2370 elif attr("binary"):
2371 collation = "BINARY"
2372 else:
2373 collation = None
2374
2375 if attr("national"):
2376 # NATIONAL (aka NCHAR/NVARCHAR) trumps charsets.
2377 return " ".join(
2378 [c for c in ("NATIONAL", spec, collation) if c is not None]
2379 )
2380 return " ".join(
2381 [c for c in (spec, charset, collation) if c is not None]
2382 )
2383
2384 def _mysql_type(self, type_: Any) -> bool:
2385 return isinstance(type_, (_StringType, _NumericType))
2386
2387 def visit_NUMERIC(self, type_: NUMERIC, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2388 if type_.precision is None:
2389 return self._extend_numeric(type_, "NUMERIC")
2390 elif type_.scale is None:
2391 return self._extend_numeric(
2392 type_,
2393 "NUMERIC(%(precision)s)" % {"precision": type_.precision},
2394 )
2395 else:
2396 return self._extend_numeric(
2397 type_,
2398 "NUMERIC(%(precision)s, %(scale)s)"
2399 % {"precision": type_.precision, "scale": type_.scale},
2400 )
2401
2402 def visit_DECIMAL(self, type_: DECIMAL, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2403 if type_.precision is None:
2404 return self._extend_numeric(type_, "DECIMAL")
2405 elif type_.scale is None:
2406 return self._extend_numeric(
2407 type_,
2408 "DECIMAL(%(precision)s)" % {"precision": type_.precision},
2409 )
2410 else:
2411 return self._extend_numeric(
2412 type_,
2413 "DECIMAL(%(precision)s, %(scale)s)"
2414 % {"precision": type_.precision, "scale": type_.scale},
2415 )
2416
2417 def visit_DOUBLE(self, type_: DOUBLE, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2418 if type_.precision is not None and type_.scale is not None:
2419 return self._extend_numeric(
2420 type_,
2421 "DOUBLE(%(precision)s, %(scale)s)"
2422 % {"precision": type_.precision, "scale": type_.scale},
2423 )
2424 else:
2425 return self._extend_numeric(type_, "DOUBLE")
2426
2427 def visit_REAL(self, type_: REAL, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2428 if type_.precision is not None and type_.scale is not None:
2429 return self._extend_numeric(
2430 type_,
2431 "REAL(%(precision)s, %(scale)s)"
2432 % {"precision": type_.precision, "scale": type_.scale},
2433 )
2434 else:
2435 return self._extend_numeric(type_, "REAL")
2436
2437 def visit_FLOAT(self, type_: FLOAT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2438 if (
2439 self._mysql_type(type_)
2440 and type_.scale is not None
2441 and type_.precision is not None
2442 ):
2443 return self._extend_numeric(
2444 type_, "FLOAT(%s, %s)" % (type_.precision, type_.scale)
2445 )
2446 elif type_.precision is not None:
2447 return self._extend_numeric(
2448 type_, "FLOAT(%s)" % (type_.precision,)
2449 )
2450 else:
2451 return self._extend_numeric(type_, "FLOAT")
2452
2453 def visit_INTEGER(self, type_: INTEGER, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2454 if self._mysql_type(type_) and type_.display_width is not None:
2455 return self._extend_numeric(
2456 type_,
2457 "INTEGER(%(display_width)s)"
2458 % {"display_width": type_.display_width},
2459 )
2460 else:
2461 return self._extend_numeric(type_, "INTEGER")
2462
2463 def visit_BIGINT(self, type_: BIGINT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2464 if self._mysql_type(type_) and type_.display_width is not None:
2465 return self._extend_numeric(
2466 type_,
2467 "BIGINT(%(display_width)s)"
2468 % {"display_width": type_.display_width},
2469 )
2470 else:
2471 return self._extend_numeric(type_, "BIGINT")
2472
2473 def visit_MEDIUMINT(self, type_: MEDIUMINT, **kw: Any) -> str:
2474 if self._mysql_type(type_) and type_.display_width is not None:
2475 return self._extend_numeric(
2476 type_,
2477 "MEDIUMINT(%(display_width)s)"
2478 % {"display_width": type_.display_width},
2479 )
2480 else:
2481 return self._extend_numeric(type_, "MEDIUMINT")
2482
2483 def visit_TINYINT(self, type_: TINYINT, **kw: Any) -> str:
2484 if self._mysql_type(type_) and type_.display_width is not None:
2485 return self._extend_numeric(
2486 type_, "TINYINT(%s)" % type_.display_width
2487 )
2488 else:
2489 return self._extend_numeric(type_, "TINYINT")
2490
2491 def visit_SMALLINT(self, type_: SMALLINT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2492 if self._mysql_type(type_) and type_.display_width is not None:
2493 return self._extend_numeric(
2494 type_,
2495 "SMALLINT(%(display_width)s)"
2496 % {"display_width": type_.display_width},
2497 )
2498 else:
2499 return self._extend_numeric(type_, "SMALLINT")
2500
2501 def visit_BIT(self, type_: BIT, **kw: Any) -> str:
2502 if type_.length is not None:
2503 return "BIT(%s)" % type_.length
2504 else:
2505 return "BIT"
2506
2507 def visit_DATETIME(self, type_: DATETIME, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2508 if getattr(type_, "fsp", None):
2509 return "DATETIME(%d)" % type_.fsp # type: ignore[str-format]
2510 else:
2511 return "DATETIME"
2512
2513 def visit_DATE(self, type_: DATE, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2514 return "DATE"
2515
2516 def visit_TIME(self, type_: TIME, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2517 if getattr(type_, "fsp", None):
2518 return "TIME(%d)" % type_.fsp # type: ignore[str-format]
2519 else:
2520 return "TIME"
2521
2522 def visit_TIMESTAMP(self, type_: TIMESTAMP, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2523 if getattr(type_, "fsp", None):
2524 return "TIMESTAMP(%d)" % type_.fsp # type: ignore[str-format]
2525 else:
2526 return "TIMESTAMP"
2527
2528 def visit_YEAR(self, type_: YEAR, **kw: Any) -> str:
2529 if type_.display_width is None:
2530 return "YEAR"
2531 else:
2532 return "YEAR(%s)" % type_.display_width
2533
2534 def visit_TEXT(self, type_: TEXT, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2535 if type_.length is not None:
2536 return self._extend_string(type_, {}, "TEXT(%d)" % type_.length)
2537 else:
2538 return self._extend_string(type_, {}, "TEXT")
2539
2540 def visit_TINYTEXT(self, type_: TINYTEXT, **kw: Any) -> str:
2541 return self._extend_string(type_, {}, "TINYTEXT")
2542
2543 def visit_MEDIUMTEXT(self, type_: MEDIUMTEXT, **kw: Any) -> str:
2544 return self._extend_string(type_, {}, "MEDIUMTEXT")
2545
2546 def visit_LONGTEXT(self, type_: LONGTEXT, **kw: Any) -> str:
2547 return self._extend_string(type_, {}, "LONGTEXT")
2548
2549 def visit_VARCHAR(self, type_: VARCHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2550 if type_.length is not None:
2551 return self._extend_string(type_, {}, "VARCHAR(%d)" % type_.length)
2552 else:
2553 raise exc.CompileError(
2554 "VARCHAR requires a length on dialect %s" % self.dialect.name
2555 )
2556
2557 def visit_CHAR(self, type_: CHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2558 if type_.length is not None:
2559 return self._extend_string(
2560 type_, {}, "CHAR(%(length)s)" % {"length": type_.length}
2561 )
2562 else:
2563 return self._extend_string(type_, {}, "CHAR")
2564
2565 def visit_NVARCHAR(self, type_: NVARCHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2566 # We'll actually generate the equiv. "NATIONAL VARCHAR" instead
2567 # of "NVARCHAR".
2568 if type_.length is not None:
2569 return self._extend_string(
2570 type_,
2571 {"national": True},
2572 "VARCHAR(%(length)s)" % {"length": type_.length},
2573 )
2574 else:
2575 raise exc.CompileError(
2576 "NVARCHAR requires a length on dialect %s" % self.dialect.name
2577 )
2578
2579 def visit_NCHAR(self, type_: NCHAR, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2580 # We'll actually generate the equiv.
2581 # "NATIONAL CHAR" instead of "NCHAR".
2582 if type_.length is not None:
2583 return self._extend_string(
2584 type_,
2585 {"national": True},
2586 "CHAR(%(length)s)" % {"length": type_.length},
2587 )
2588 else:
2589 return self._extend_string(type_, {"national": True}, "CHAR")
2590
2591 def visit_UUID(self, type_: UUID[Any], **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2592 return "UUID"
2593
2594 def visit_VARBINARY(self, type_: VARBINARY, **kw: Any) -> str:
2595 return "VARBINARY(%d)" % type_.length # type: ignore[str-format]
2596
2597 def visit_JSON(self, type_: JSON, **kw: Any) -> str:
2598 return "JSON"
2599
2600 def visit_large_binary(self, type_: LargeBinary, **kw: Any) -> str:
2601 return self.visit_BLOB(type_)
2602
2603 def visit_enum(self, type_: ENUM, **kw: Any) -> str: # type: ignore[override] # NOQA: E501
2604 if not type_.native_enum:
2605 return super().visit_enum(type_)
2606 else:
2607 return self._visit_enumerated_values("ENUM", type_, type_.enums)
2608
2609 def visit_BLOB(self, type_: LargeBinary, **kw: Any) -> str:
2610 if type_.length is not None:
2611 return "BLOB(%d)" % type_.length
2612 else:
2613 return "BLOB"
2614
2615 def visit_TINYBLOB(self, type_: TINYBLOB, **kw: Any) -> str:
2616 return "TINYBLOB"
2617
2618 def visit_MEDIUMBLOB(self, type_: MEDIUMBLOB, **kw: Any) -> str:
2619 return "MEDIUMBLOB"
2620
2621 def visit_LONGBLOB(self, type_: LONGBLOB, **kw: Any) -> str:
2622 return "LONGBLOB"
2623
2624 def _visit_enumerated_values(
2625 self, name: str, type_: _StringType, enumerated_values: Sequence[str]
2626 ) -> str:
2627 quoted_enums = []
2628 for e in enumerated_values:
2629 if self.dialect.identifier_preparer._double_percents:
2630 e = e.replace("%", "%%")
2631 quoted_enums.append("'%s'" % e.replace("'", "''"))
2632 return self._extend_string(
2633 type_, {}, "%s(%s)" % (name, ",".join(quoted_enums))
2634 )
2635
2636 def visit_ENUM(self, type_: ENUM, **kw: Any) -> str:
2637 return self._visit_enumerated_values("ENUM", type_, type_.enums)
2638
2639 def visit_SET(self, type_: SET, **kw: Any) -> str:
2640 return self._visit_enumerated_values("SET", type_, type_.values)
2641
2642 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
2643 return "BOOL"
2644
2645
2646class MySQLIdentifierPreparer(compiler.IdentifierPreparer):
2647 reserved_words = RESERVED_WORDS_MYSQL
2648
2649 def __init__(
2650 self,
2651 dialect: default.DefaultDialect,
2652 server_ansiquotes: bool = False,
2653 **kw: Any,
2654 ):
2655 if not server_ansiquotes:
2656 quote = "`"
2657 else:
2658 quote = '"'
2659
2660 super().__init__(dialect, initial_quote=quote, escape_quote=quote)
2661
2662 def _quote_free_identifiers(self, *ids: Optional[str]) -> Tuple[str, ...]:
2663 """Unilaterally identifier-quote any number of strings."""
2664
2665 return tuple([self.quote_identifier(i) for i in ids if i is not None])
2666
2667
2668class MariaDBIdentifierPreparer(MySQLIdentifierPreparer):
2669 reserved_words = RESERVED_WORDS_MARIADB
2670
2671
2672class MySQLDialect(default.DefaultDialect):
2673 """Details of the MySQL dialect.
2674 Not used directly in application code.
2675 """
2676
2677 name = "mysql"
2678 supports_statement_cache = True
2679
2680 supports_alter = True
2681
2682 # MySQL has no true "boolean" type; we
2683 # allow for the "true" and "false" keywords, however
2684 supports_native_boolean = False
2685
2686 # support for BIT type; mysqlconnector coerces result values automatically,
2687 # all other MySQL DBAPIs require a conversion routine
2688 supports_native_bit = False
2689
2690 # identifiers are 64, however aliases can be 255...
2691 max_identifier_length = 255
2692 max_index_name_length = 64
2693 max_constraint_name_length = 64
2694
2695 div_is_floordiv = False
2696
2697 supports_native_enum = True
2698
2699 returns_native_bytes = True
2700
2701 # ... may be updated to True for MariaDB 10.3+ in initialize()
2702 supports_sequences = False
2703
2704 sequences_optional = False
2705
2706 # ... may be updated to True for MySQL 8+ in initialize()
2707 supports_for_update_of = False
2708
2709 # mysql 8.0.1 uses this syntax
2710 use_mysql_for_share = False
2711
2712 # Only available ... ... in MySQL 8+
2713 _requires_alias_for_on_duplicate_key = False
2714
2715 # MySQL doesn't support "DEFAULT VALUES" but *does* support
2716 # "VALUES (DEFAULT)"
2717 supports_default_values = False
2718 supports_default_metavalue = True
2719
2720 use_insertmanyvalues: bool = True
2721 insertmanyvalues_implicit_sentinel = (
2722 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
2723 )
2724
2725 supports_sane_rowcount = True
2726 supports_sane_multi_rowcount = False
2727 supports_multivalues_insert = True
2728 insert_null_pk_still_autoincrements = True
2729
2730 supports_comments = True
2731 inline_comments = True
2732 default_paramstyle = "format"
2733 colspecs = colspecs
2734
2735 cte_follows_insert = True
2736
2737 statement_compiler = MySQLCompiler
2738 ddl_compiler = MySQLDDLCompiler
2739 type_compiler_cls = MySQLTypeCompiler
2740 ischema_names = ischema_names
2741 preparer: type[MySQLIdentifierPreparer] = MySQLIdentifierPreparer
2742
2743 is_mariadb: bool = False
2744 _mariadb_normalized_version_info = None
2745
2746 # default SQL compilation settings -
2747 # these are modified upon initialize(),
2748 # i.e. first connect
2749 _backslash_escapes = True
2750 _server_ansiquotes = False
2751
2752 server_version_info: Tuple[int, ...]
2753 identifier_preparer: MySQLIdentifierPreparer
2754
2755 construct_arguments = [
2756 (sa_schema.Table, {"*": None}),
2757 (sql.Update, {"limit": None}),
2758 (sql.Delete, {"limit": None}),
2759 (sa_schema.PrimaryKeyConstraint, {"using": None}),
2760 (
2761 sa_schema.Index,
2762 {
2763 "using": None,
2764 "length": None,
2765 "prefix": None,
2766 "with_parser": None,
2767 },
2768 ),
2769 ]
2770
2771 def __init__(
2772 self,
2773 json_serializer: Optional[Callable[..., Any]] = None,
2774 json_deserializer: Optional[Callable[..., Any]] = None,
2775 is_mariadb: Optional[bool] = None,
2776 **kwargs: Any,
2777 ) -> None:
2778 kwargs.pop("use_ansiquotes", None) # legacy
2779 default.DefaultDialect.__init__(self, **kwargs)
2780 self._json_serializer = json_serializer
2781 self._json_deserializer = json_deserializer
2782 self._set_mariadb(is_mariadb, ())
2783
2784 def get_isolation_level_values(
2785 self, dbapi_conn: DBAPIConnection
2786 ) -> Sequence[IsolationLevel]:
2787 return (
2788 "SERIALIZABLE",
2789 "READ UNCOMMITTED",
2790 "READ COMMITTED",
2791 "REPEATABLE READ",
2792 )
2793
2794 def set_isolation_level(
2795 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
2796 ) -> None:
2797 cursor = dbapi_connection.cursor()
2798 cursor.execute(f"SET SESSION TRANSACTION ISOLATION LEVEL {level}")
2799 cursor.execute("COMMIT")
2800 cursor.close()
2801
2802 def get_isolation_level(
2803 self, dbapi_connection: DBAPIConnection
2804 ) -> IsolationLevel:
2805 cursor = dbapi_connection.cursor()
2806 if self._is_mysql and self.server_version_info >= (5, 7, 20):
2807 cursor.execute("SELECT @@transaction_isolation")
2808 else:
2809 cursor.execute("SELECT @@tx_isolation")
2810 row = cursor.fetchone()
2811 if row is None:
2812 util.warn(
2813 "Could not retrieve transaction isolation level for MySQL "
2814 "connection."
2815 )
2816 raise NotImplementedError()
2817 val = row[0]
2818 cursor.close()
2819 if isinstance(val, bytes):
2820 val = val.decode()
2821 return val.upper().replace("-", " ") # type: ignore[no-any-return]
2822
2823 @classmethod
2824 def _is_mariadb_from_url(cls, url: URL) -> bool:
2825 dbapi = cls.import_dbapi()
2826 dialect = cls(dbapi=dbapi)
2827
2828 cargs, cparams = dialect.create_connect_args(url)
2829 conn = dialect.connect(*cargs, **cparams)
2830 try:
2831 cursor = conn.cursor()
2832 cursor.execute("SELECT VERSION() LIKE '%MariaDB%'")
2833 val = cursor.fetchone()[0] # type: ignore[index]
2834 except:
2835 raise
2836 else:
2837 return bool(val)
2838 finally:
2839 conn.close()
2840
2841 def _get_server_version_info(
2842 self, connection: Connection
2843 ) -> Tuple[int, ...]:
2844 # get database server version info explicitly over the wire
2845 # to avoid proxy servers like MaxScale getting in the
2846 # way with their own values, see #4205
2847 dbapi_con = connection.connection
2848 cursor = dbapi_con.cursor()
2849 cursor.execute("SELECT VERSION()")
2850
2851 val = cursor.fetchone()[0] # type: ignore[index]
2852 cursor.close()
2853 if isinstance(val, bytes):
2854 val = val.decode()
2855
2856 return self._parse_server_version(val)
2857
2858 def _parse_server_version(self, val: str) -> Tuple[int, ...]:
2859 version: List[int] = []
2860 is_mariadb = False
2861
2862 r = re.compile(r"[.\-+]")
2863 tokens = r.split(val)
2864 for token in tokens:
2865 parsed_token = re.match(
2866 r"^(?:(\d+)(?:a|b|c)?|(MariaDB\w*))$", token
2867 )
2868 if not parsed_token:
2869 continue
2870 elif parsed_token.group(2):
2871 self._mariadb_normalized_version_info = tuple(version[-3:])
2872 is_mariadb = True
2873 else:
2874 digit = int(parsed_token.group(1))
2875 version.append(digit)
2876
2877 server_version_info = tuple(version)
2878
2879 self._set_mariadb(
2880 bool(server_version_info and is_mariadb), server_version_info
2881 )
2882
2883 if not is_mariadb:
2884 self._mariadb_normalized_version_info = server_version_info
2885
2886 if server_version_info < (5, 0, 2):
2887 raise NotImplementedError(
2888 "the MySQL/MariaDB dialect supports server "
2889 "version info 5.0.2 and above."
2890 )
2891
2892 # setting it here to help w the test suite
2893 self.server_version_info = server_version_info
2894 return server_version_info
2895
2896 def _set_mariadb(
2897 self, is_mariadb: Optional[bool], server_version_info: Tuple[int, ...]
2898 ) -> None:
2899 if is_mariadb is None:
2900 return
2901
2902 if not is_mariadb and self.is_mariadb:
2903 raise exc.InvalidRequestError(
2904 "MySQL version %s is not a MariaDB variant."
2905 % (".".join(map(str, server_version_info)),)
2906 )
2907 if is_mariadb:
2908
2909 if not issubclass(self.preparer, MariaDBIdentifierPreparer):
2910 self.preparer = MariaDBIdentifierPreparer
2911 # this would have been set by the default dialect already,
2912 # so set it again
2913 self.identifier_preparer = self.preparer(self)
2914
2915 # this will be updated on first connect in initialize()
2916 # if using older mariadb version
2917 self.delete_returning = True
2918 self.insert_returning = True
2919
2920 self.is_mariadb = is_mariadb
2921
2922 def do_begin_twophase(self, connection: Connection, xid: Any) -> None:
2923 connection.execute(sql.text("XA BEGIN :xid"), dict(xid=xid))
2924
2925 def do_prepare_twophase(self, connection: Connection, xid: Any) -> None:
2926 connection.execute(sql.text("XA END :xid"), dict(xid=xid))
2927 connection.execute(sql.text("XA PREPARE :xid"), dict(xid=xid))
2928
2929 def do_rollback_twophase(
2930 self,
2931 connection: Connection,
2932 xid: Any,
2933 is_prepared: bool = True,
2934 recover: bool = False,
2935 ) -> None:
2936 if not is_prepared:
2937 connection.execute(sql.text("XA END :xid"), dict(xid=xid))
2938 connection.execute(sql.text("XA ROLLBACK :xid"), dict(xid=xid))
2939
2940 def do_commit_twophase(
2941 self,
2942 connection: Connection,
2943 xid: Any,
2944 is_prepared: bool = True,
2945 recover: bool = False,
2946 ) -> None:
2947 if not is_prepared:
2948 self.do_prepare_twophase(connection, xid)
2949 connection.execute(sql.text("XA COMMIT :xid"), dict(xid=xid))
2950
2951 def do_recover_twophase(self, connection: Connection) -> List[Any]:
2952 resultset = connection.exec_driver_sql("XA RECOVER")
2953 return [
2954 row["data"][0 : row["gtrid_length"]]
2955 for row in resultset.mappings()
2956 ]
2957
2958 def is_disconnect(
2959 self,
2960 e: DBAPIModule.Error,
2961 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
2962 cursor: Optional[DBAPICursor],
2963 ) -> bool:
2964 if isinstance(
2965 e,
2966 (
2967 self.dbapi.OperationalError, # type: ignore
2968 self.dbapi.ProgrammingError, # type: ignore
2969 self.dbapi.InterfaceError, # type: ignore
2970 ),
2971 ) and self._extract_error_code(e) in (
2972 1927,
2973 2006,
2974 2013,
2975 2014,
2976 2045,
2977 2055,
2978 4031,
2979 ):
2980 return True
2981 elif isinstance(
2982 e, (self.dbapi.InterfaceError, self.dbapi.InternalError) # type: ignore # noqa: E501
2983 ):
2984 # if underlying connection is closed,
2985 # this is the error you get
2986 return "(0, '')" in str(e)
2987 else:
2988 return False
2989
2990 def _compat_fetchall(
2991 self, rp: CursorResult[Any], charset: Optional[str] = None
2992 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]:
2993 """Proxy result rows to smooth over MySQL-Python driver
2994 inconsistencies."""
2995
2996 return [_DecodingRow(row, charset) for row in rp.fetchall()]
2997
2998 def _compat_fetchone(
2999 self, rp: CursorResult[Any], charset: Optional[str] = None
3000 ) -> Union[Row[Any], None, _DecodingRow]:
3001 """Proxy a result row to smooth over MySQL-Python driver
3002 inconsistencies."""
3003
3004 row = rp.fetchone()
3005 if row:
3006 return _DecodingRow(row, charset)
3007 else:
3008 return None
3009
3010 def _compat_first(
3011 self, rp: CursorResult[Any], charset: Optional[str] = None
3012 ) -> Optional[_DecodingRow]:
3013 """Proxy a result row to smooth over MySQL-Python driver
3014 inconsistencies."""
3015
3016 row = rp.first()
3017 if row:
3018 return _DecodingRow(row, charset)
3019 else:
3020 return None
3021
3022 def _extract_error_code(
3023 self, exception: DBAPIModule.Error
3024 ) -> Optional[int]:
3025 raise NotImplementedError()
3026
3027 def _get_default_schema_name(self, connection: Connection) -> str:
3028 return connection.exec_driver_sql("SELECT DATABASE()").scalar() # type: ignore[return-value] # noqa: E501
3029
3030 @reflection.cache
3031 def has_table(
3032 self,
3033 connection: Connection,
3034 table_name: str,
3035 schema: Optional[str] = None,
3036 **kw: Any,
3037 ) -> bool:
3038 self._ensure_has_table_connection(connection)
3039
3040 if schema is None:
3041 schema = self.default_schema_name
3042
3043 assert schema is not None
3044
3045 full_name = ".".join(
3046 self.identifier_preparer._quote_free_identifiers(
3047 schema, table_name
3048 )
3049 )
3050
3051 # DESCRIBE *must* be used because there is no information schema
3052 # table that returns information on temp tables that is consistently
3053 # available on MariaDB / MySQL / engine-agnostic etc.
3054 # therefore we have no choice but to use DESCRIBE and an error catch
3055 # to detect "False". See issue #9058
3056
3057 try:
3058 with connection.exec_driver_sql(
3059 f"DESCRIBE {full_name}",
3060 execution_options={"skip_user_error_events": True},
3061 ) as rs:
3062 return rs.fetchone() is not None
3063 except exc.DBAPIError as e:
3064 # https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html # noqa: E501
3065 # there are a lot of codes that *may* pop up here at some point
3066 # but we continue to be fairly conservative. We include:
3067 # 1146: Table '%s.%s' doesn't exist - what every MySQL has emitted
3068 # for decades
3069 #
3070 # mysql 8 suddenly started emitting:
3071 # 1049: Unknown database '%s' - for nonexistent schema
3072 #
3073 # also added:
3074 # 1051: Unknown table '%s' - not known to emit
3075 #
3076 # there's more "doesn't exist" kinds of messages but they are
3077 # less clear if mysql 8 would suddenly start using one of those
3078 if self._extract_error_code(e.orig) in (1146, 1049, 1051): # type: ignore # noqa: E501
3079 return False
3080 raise
3081
3082 @reflection.cache
3083 def has_sequence(
3084 self,
3085 connection: Connection,
3086 sequence_name: str,
3087 schema: Optional[str] = None,
3088 **kw: Any,
3089 ) -> bool:
3090 if not self.supports_sequences:
3091 self._sequences_not_supported()
3092 if not schema:
3093 schema = self.default_schema_name
3094 # MariaDB implements sequences as a special type of table
3095 #
3096 cursor = connection.execute(
3097 sql.text(
3098 "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
3099 "WHERE TABLE_TYPE='SEQUENCE' and TABLE_NAME=:name AND "
3100 "TABLE_SCHEMA=:schema_name"
3101 ),
3102 dict(
3103 name=str(sequence_name),
3104 schema_name=str(schema),
3105 ),
3106 )
3107 return cursor.first() is not None
3108
3109 def _sequences_not_supported(self) -> NoReturn:
3110 raise NotImplementedError(
3111 "Sequences are supported only by the "
3112 "MariaDB series 10.3 or greater"
3113 )
3114
3115 @reflection.cache
3116 def get_sequence_names(
3117 self, connection: Connection, schema: Optional[str] = None, **kw: Any
3118 ) -> List[str]:
3119 if not self.supports_sequences:
3120 self._sequences_not_supported()
3121 if not schema:
3122 schema = self.default_schema_name
3123 # MariaDB implements sequences as a special type of table
3124 cursor = connection.execute(
3125 sql.text(
3126 "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
3127 "WHERE TABLE_TYPE='SEQUENCE' and TABLE_SCHEMA=:schema_name"
3128 ),
3129 dict(schema_name=schema),
3130 )
3131 return [
3132 row[0]
3133 for row in self._compat_fetchall(
3134 cursor, charset=self._connection_charset
3135 )
3136 ]
3137
3138 def initialize(self, connection: Connection) -> None:
3139 # this is driver-based, does not need server version info
3140 # and is fairly critical for even basic SQL operations
3141 self._connection_charset: Optional[str] = self._detect_charset(
3142 connection
3143 )
3144
3145 # call super().initialize() because we need to have
3146 # server_version_info set up. in 1.4 under python 2 only this does the
3147 # "check unicode returns" thing, which is the one area that some
3148 # SQL gets compiled within initialize() currently
3149 default.DefaultDialect.initialize(self, connection)
3150
3151 self._detect_sql_mode(connection)
3152 self._detect_ansiquotes(connection) # depends on sql mode
3153 self._detect_casing(connection)
3154 if self._server_ansiquotes:
3155 # if ansiquotes == True, build a new IdentifierPreparer
3156 # with the new setting
3157 self.identifier_preparer = self.preparer(
3158 self, server_ansiquotes=self._server_ansiquotes
3159 )
3160
3161 self.supports_sequences = (
3162 self.is_mariadb and self.server_version_info >= (10, 3)
3163 )
3164
3165 self.supports_for_update_of = (
3166 self._is_mysql and self.server_version_info >= (8,)
3167 )
3168
3169 self.use_mysql_for_share = (
3170 self._is_mysql and self.server_version_info >= (8, 0, 1)
3171 )
3172
3173 self._needs_correct_for_88718_96365 = (
3174 not self.is_mariadb and self.server_version_info >= (8,)
3175 )
3176
3177 self.delete_returning = (
3178 self.is_mariadb and self.server_version_info >= (10, 0, 5)
3179 )
3180
3181 self.insert_returning = (
3182 self.is_mariadb and self.server_version_info >= (10, 5)
3183 )
3184
3185 self._requires_alias_for_on_duplicate_key = (
3186 self._is_mysql and self.server_version_info >= (8, 0, 20)
3187 )
3188
3189 self._warn_for_known_db_issues()
3190
3191 def _warn_for_known_db_issues(self) -> None:
3192 if self.is_mariadb:
3193 mdb_version = self._mariadb_normalized_version_info
3194 assert mdb_version is not None
3195 if mdb_version > (10, 2) and mdb_version < (10, 2, 9):
3196 util.warn(
3197 "MariaDB %r before 10.2.9 has known issues regarding "
3198 "CHECK constraints, which impact handling of NULL values "
3199 "with SQLAlchemy's boolean datatype (MDEV-13596). An "
3200 "additional issue prevents proper migrations of columns "
3201 "with CHECK constraints (MDEV-11114). Please upgrade to "
3202 "MariaDB 10.2.9 or greater, or use the MariaDB 10.1 "
3203 "series, to avoid these issues." % (mdb_version,)
3204 )
3205
3206 @property
3207 def _support_float_cast(self) -> bool:
3208 if not self.server_version_info:
3209 return False
3210 elif self.is_mariadb:
3211 # ref https://mariadb.com/kb/en/mariadb-1045-release-notes/
3212 return self.server_version_info >= (10, 4, 5)
3213 else:
3214 # ref https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-17.html#mysqld-8-0-17-feature # noqa
3215 return self.server_version_info >= (8, 0, 17)
3216
3217 @property
3218 def _support_default_function(self) -> bool:
3219 if not self.server_version_info:
3220 return False
3221 elif self.is_mariadb:
3222 # ref https://mariadb.com/kb/en/mariadb-1021-release-notes/
3223 return self.server_version_info >= (10, 2, 1)
3224 else:
3225 # ref https://dev.mysql.com/doc/refman/8.0/en/data-type-defaults.html # noqa
3226 return self.server_version_info >= (8, 0, 13)
3227
3228 @property
3229 def _is_mariadb(self) -> bool:
3230 return self.is_mariadb
3231
3232 @property
3233 def _is_mysql(self) -> bool:
3234 return not self.is_mariadb
3235
3236 @property
3237 def _is_mariadb_102(self) -> bool:
3238 return (
3239 self.is_mariadb
3240 and self._mariadb_normalized_version_info # type:ignore[operator]
3241 > (
3242 10,
3243 2,
3244 )
3245 )
3246
3247 @reflection.cache
3248 def get_schema_names(self, connection: Connection, **kw: Any) -> List[str]:
3249 rp = connection.exec_driver_sql("SHOW schemas")
3250 return [r[0] for r in rp]
3251
3252 @reflection.cache
3253 def get_table_names(
3254 self, connection: Connection, schema: Optional[str] = None, **kw: Any
3255 ) -> List[str]:
3256 """Return a Unicode SHOW TABLES from a given schema."""
3257 if schema is not None:
3258 current_schema: str = schema
3259 else:
3260 current_schema = self.default_schema_name # type: ignore
3261
3262 charset = self._connection_charset
3263
3264 rp = connection.exec_driver_sql(
3265 "SHOW FULL TABLES FROM %s"
3266 % self.identifier_preparer.quote_identifier(current_schema)
3267 )
3268
3269 return [
3270 row[0]
3271 for row in self._compat_fetchall(rp, charset=charset)
3272 if row[1] == "BASE TABLE"
3273 ]
3274
3275 @reflection.cache
3276 def get_view_names(
3277 self, connection: Connection, schema: Optional[str] = None, **kw: Any
3278 ) -> List[str]:
3279 if schema is None:
3280 schema = self.default_schema_name
3281 assert schema is not None
3282 charset = self._connection_charset
3283 rp = connection.exec_driver_sql(
3284 "SHOW FULL TABLES FROM %s"
3285 % self.identifier_preparer.quote_identifier(schema)
3286 )
3287 return [
3288 row[0]
3289 for row in self._compat_fetchall(rp, charset=charset)
3290 if row[1] in ("VIEW", "SYSTEM VIEW")
3291 ]
3292
3293 @reflection.cache
3294 def get_table_options(
3295 self,
3296 connection: Connection,
3297 table_name: str,
3298 schema: Optional[str] = None,
3299 **kw: Any,
3300 ) -> Dict[str, Any]:
3301 parsed_state = self._parsed_state_or_create(
3302 connection, table_name, schema, **kw
3303 )
3304 if parsed_state.table_options:
3305 return parsed_state.table_options
3306 else:
3307 return ReflectionDefaults.table_options()
3308
3309 @reflection.cache
3310 def get_columns(
3311 self,
3312 connection: Connection,
3313 table_name: str,
3314 schema: Optional[str] = None,
3315 **kw: Any,
3316 ) -> List[ReflectedColumn]:
3317 parsed_state = self._parsed_state_or_create(
3318 connection, table_name, schema, **kw
3319 )
3320 if parsed_state.columns:
3321 return parsed_state.columns
3322 else:
3323 return ReflectionDefaults.columns()
3324
3325 @reflection.cache
3326 def get_pk_constraint(
3327 self,
3328 connection: Connection,
3329 table_name: str,
3330 schema: Optional[str] = None,
3331 **kw: Any,
3332 ) -> ReflectedPrimaryKeyConstraint:
3333 parsed_state = self._parsed_state_or_create(
3334 connection, table_name, schema, **kw
3335 )
3336 for key in parsed_state.keys:
3337 if key["type"] == "PRIMARY":
3338 # There can be only one.
3339 cols = [s[0] for s in key["columns"]]
3340 return {"constrained_columns": cols, "name": None}
3341 return ReflectionDefaults.pk_constraint()
3342
3343 @reflection.cache
3344 def get_foreign_keys(
3345 self,
3346 connection: Connection,
3347 table_name: str,
3348 schema: Optional[str] = None,
3349 **kw: Any,
3350 ) -> List[ReflectedForeignKeyConstraint]:
3351 parsed_state = self._parsed_state_or_create(
3352 connection, table_name, schema, **kw
3353 )
3354 default_schema = None
3355
3356 fkeys: List[ReflectedForeignKeyConstraint] = []
3357
3358 for spec in parsed_state.fk_constraints:
3359 ref_name = spec["table"][-1]
3360 ref_schema = len(spec["table"]) > 1 and spec["table"][-2] or schema
3361
3362 if not ref_schema:
3363 if default_schema is None:
3364 default_schema = connection.dialect.default_schema_name
3365 if schema == default_schema:
3366 ref_schema = schema
3367
3368 loc_names = spec["local"]
3369 ref_names = spec["foreign"]
3370
3371 con_kw = {}
3372 for opt in ("onupdate", "ondelete"):
3373 if spec.get(opt, False) not in ("NO ACTION", None):
3374 con_kw[opt] = spec[opt]
3375
3376 fkey_d: ReflectedForeignKeyConstraint = {
3377 "name": spec["name"],
3378 "constrained_columns": loc_names,
3379 "referred_schema": ref_schema,
3380 "referred_table": ref_name,
3381 "referred_columns": ref_names,
3382 "options": con_kw,
3383 }
3384 fkeys.append(fkey_d)
3385
3386 if self._needs_correct_for_88718_96365:
3387 self._correct_for_mysql_bugs_88718_96365(fkeys, connection)
3388
3389 return fkeys if fkeys else ReflectionDefaults.foreign_keys()
3390
3391 def _correct_for_mysql_bugs_88718_96365(
3392 self,
3393 fkeys: List[ReflectedForeignKeyConstraint],
3394 connection: Connection,
3395 ) -> None:
3396 # Foreign key is always in lower case (MySQL 8.0)
3397 # https://bugs.mysql.com/bug.php?id=88718
3398 # issue #4344 for SQLAlchemy
3399
3400 # table name also for MySQL 8.0
3401 # https://bugs.mysql.com/bug.php?id=96365
3402 # issue #4751 for SQLAlchemy
3403
3404 # for lower_case_table_names=2, information_schema.columns
3405 # preserves the original table/schema casing, but SHOW CREATE
3406 # TABLE does not. this problem is not in lower_case_table_names=1,
3407 # but use case-insensitive matching for these two modes in any case.
3408
3409 if self._casing in (1, 2):
3410
3411 def lower(s: str) -> str:
3412 return s.lower()
3413
3414 else:
3415 # if on case sensitive, there can be two tables referenced
3416 # with the same name different casing, so we need to use
3417 # case-sensitive matching.
3418 def lower(s: str) -> str:
3419 return s
3420
3421 default_schema_name: str = connection.dialect.default_schema_name # type: ignore # noqa: E501
3422
3423 # NOTE: using (table_schema, table_name, lower(column_name)) in (...)
3424 # is very slow since mysql does not seem able to properly use indexse.
3425 # Unpack the where condition instead.
3426 schema_by_table_by_column: DefaultDict[
3427 str, DefaultDict[str, List[str]]
3428 ] = DefaultDict(lambda: DefaultDict(list))
3429 for rec in fkeys:
3430 sch = lower(rec["referred_schema"] or default_schema_name)
3431 tbl = lower(rec["referred_table"])
3432 for col_name in rec["referred_columns"]:
3433 schema_by_table_by_column[sch][tbl].append(col_name)
3434
3435 if schema_by_table_by_column:
3436
3437 condition = sql.or_(
3438 *(
3439 sql.and_(
3440 _info_columns.c.table_schema == schema,
3441 sql.or_(
3442 *(
3443 sql.and_(
3444 _info_columns.c.table_name == table,
3445 sql.func.lower(
3446 _info_columns.c.column_name
3447 ).in_(columns),
3448 )
3449 for table, columns in tables.items()
3450 )
3451 ),
3452 )
3453 for schema, tables in schema_by_table_by_column.items()
3454 )
3455 )
3456
3457 select = sql.select(
3458 _info_columns.c.table_schema,
3459 _info_columns.c.table_name,
3460 _info_columns.c.column_name,
3461 ).where(condition)
3462
3463 correct_for_wrong_fk_case: CursorResult[Tuple[str, str, str]] = (
3464 connection.execute(select)
3465 )
3466
3467 # in casing=0, table name and schema name come back in their
3468 # exact case.
3469 # in casing=1, table name and schema name come back in lower
3470 # case.
3471 # in casing=2, table name and schema name come back from the
3472 # information_schema.columns view in the case
3473 # that was used in CREATE DATABASE and CREATE TABLE, but
3474 # SHOW CREATE TABLE converts them to *lower case*, therefore
3475 # not matching. So for this case, case-insensitive lookup
3476 # is necessary
3477 d: DefaultDict[Tuple[str, str], Dict[str, str]] = defaultdict(dict)
3478 for schema, tname, cname in correct_for_wrong_fk_case:
3479 d[(lower(schema), lower(tname))]["SCHEMANAME"] = schema
3480 d[(lower(schema), lower(tname))]["TABLENAME"] = tname
3481 d[(lower(schema), lower(tname))][cname.lower()] = cname
3482
3483 for fkey in fkeys:
3484 rec_b = d[
3485 (
3486 lower(fkey["referred_schema"] or default_schema_name),
3487 lower(fkey["referred_table"]),
3488 )
3489 ]
3490
3491 fkey["referred_table"] = rec_b["TABLENAME"]
3492 if fkey["referred_schema"] is not None:
3493 fkey["referred_schema"] = rec_b["SCHEMANAME"]
3494
3495 fkey["referred_columns"] = [
3496 rec_b[col.lower()] for col in fkey["referred_columns"]
3497 ]
3498
3499 @reflection.cache
3500 def get_check_constraints(
3501 self,
3502 connection: Connection,
3503 table_name: str,
3504 schema: Optional[str] = None,
3505 **kw: Any,
3506 ) -> List[ReflectedCheckConstraint]:
3507 parsed_state = self._parsed_state_or_create(
3508 connection, table_name, schema, **kw
3509 )
3510
3511 cks: List[ReflectedCheckConstraint] = [
3512 {"name": spec["name"], "sqltext": spec["sqltext"]}
3513 for spec in parsed_state.ck_constraints
3514 ]
3515 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
3516 return cks if cks else ReflectionDefaults.check_constraints()
3517
3518 @reflection.cache
3519 def get_table_comment(
3520 self,
3521 connection: Connection,
3522 table_name: str,
3523 schema: Optional[str] = None,
3524 **kw: Any,
3525 ) -> ReflectedTableComment:
3526 parsed_state = self._parsed_state_or_create(
3527 connection, table_name, schema, **kw
3528 )
3529 comment = parsed_state.table_options.get(f"{self.name}_comment", None)
3530 if comment is not None:
3531 return {"text": comment}
3532 else:
3533 return ReflectionDefaults.table_comment()
3534
3535 @reflection.cache
3536 def get_indexes(
3537 self,
3538 connection: Connection,
3539 table_name: str,
3540 schema: Optional[str] = None,
3541 **kw: Any,
3542 ) -> List[ReflectedIndex]:
3543 parsed_state = self._parsed_state_or_create(
3544 connection, table_name, schema, **kw
3545 )
3546
3547 indexes: List[ReflectedIndex] = []
3548
3549 for spec in parsed_state.keys:
3550 dialect_options = {}
3551 unique = False
3552 flavor = spec["type"]
3553 if flavor == "PRIMARY":
3554 continue
3555 if flavor == "UNIQUE":
3556 unique = True
3557 elif flavor in ("FULLTEXT", "SPATIAL"):
3558 dialect_options["%s_prefix" % self.name] = flavor
3559 elif flavor is not None:
3560 util.warn(
3561 "Converting unknown KEY type %s to a plain KEY", flavor
3562 )
3563
3564 if spec["parser"]:
3565 dialect_options["%s_with_parser" % (self.name)] = spec[
3566 "parser"
3567 ]
3568
3569 index_d: ReflectedIndex = {
3570 "name": spec["name"],
3571 "column_names": [s[0] for s in spec["columns"]],
3572 "unique": unique,
3573 }
3574
3575 mysql_length = {
3576 s[0]: s[1] for s in spec["columns"] if s[1] is not None
3577 }
3578 if mysql_length:
3579 dialect_options["%s_length" % self.name] = mysql_length
3580
3581 if flavor:
3582 index_d["type"] = flavor # type: ignore[typeddict-unknown-key]
3583
3584 if dialect_options:
3585 index_d["dialect_options"] = dialect_options
3586
3587 indexes.append(index_d)
3588 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
3589 return indexes if indexes else ReflectionDefaults.indexes()
3590
3591 @reflection.cache
3592 def get_unique_constraints(
3593 self,
3594 connection: Connection,
3595 table_name: str,
3596 schema: Optional[str] = None,
3597 **kw: Any,
3598 ) -> List[ReflectedUniqueConstraint]:
3599 parsed_state = self._parsed_state_or_create(
3600 connection, table_name, schema, **kw
3601 )
3602
3603 ucs: List[ReflectedUniqueConstraint] = [
3604 {
3605 "name": key["name"],
3606 "column_names": [col[0] for col in key["columns"]],
3607 "duplicates_index": key["name"],
3608 }
3609 for key in parsed_state.keys
3610 if key["type"] == "UNIQUE"
3611 ]
3612 ucs.sort(key=lambda d: d["name"] or "~") # sort None as last
3613 if ucs:
3614 return ucs
3615 else:
3616 return ReflectionDefaults.unique_constraints()
3617
3618 @reflection.cache
3619 def get_view_definition(
3620 self,
3621 connection: Connection,
3622 view_name: str,
3623 schema: Optional[str] = None,
3624 **kw: Any,
3625 ) -> str:
3626 charset = self._connection_charset
3627 full_name = ".".join(
3628 self.identifier_preparer._quote_free_identifiers(schema, view_name)
3629 )
3630 sql = self._show_create_table(
3631 connection, None, charset, full_name=full_name
3632 )
3633 if sql.upper().startswith("CREATE TABLE"):
3634 # it's a table, not a view
3635 raise exc.NoSuchTableError(full_name)
3636 return sql
3637
3638 def _parsed_state_or_create(
3639 self,
3640 connection: Connection,
3641 table_name: str,
3642 schema: Optional[str] = None,
3643 **kw: Any,
3644 ) -> _reflection.ReflectedState:
3645 return self._setup_parser(
3646 connection,
3647 table_name,
3648 schema,
3649 info_cache=kw.get("info_cache", None),
3650 )
3651
3652 @util.memoized_property
3653 def _tabledef_parser(self) -> _reflection.MySQLTableDefinitionParser:
3654 """return the MySQLTableDefinitionParser, generate if needed.
3655
3656 The deferred creation ensures that the dialect has
3657 retrieved server version information first.
3658
3659 """
3660 preparer = self.identifier_preparer
3661 return _reflection.MySQLTableDefinitionParser(self, preparer)
3662
3663 @reflection.cache
3664 def _setup_parser(
3665 self,
3666 connection: Connection,
3667 table_name: str,
3668 schema: Optional[str] = None,
3669 **kw: Any,
3670 ) -> _reflection.ReflectedState:
3671 charset = self._connection_charset
3672 parser = self._tabledef_parser
3673 full_name = ".".join(
3674 self.identifier_preparer._quote_free_identifiers(
3675 schema, table_name
3676 )
3677 )
3678 sql = self._show_create_table(
3679 connection, None, charset, full_name=full_name
3680 )
3681 if parser._check_view(sql):
3682 # Adapt views to something table-like.
3683 columns = self._describe_table(
3684 connection, None, charset, full_name=full_name
3685 )
3686 sql = parser._describe_to_create(
3687 table_name, columns # type: ignore[arg-type]
3688 )
3689 return parser.parse(sql, charset)
3690
3691 def _fetch_setting(
3692 self, connection: Connection, setting_name: str
3693 ) -> Optional[str]:
3694 charset = self._connection_charset
3695
3696 if self.server_version_info and self.server_version_info < (5, 6):
3697 sql = "SHOW VARIABLES LIKE '%s'" % setting_name
3698 fetch_col = 1
3699 else:
3700 sql = "SELECT @@%s" % setting_name
3701 fetch_col = 0
3702
3703 show_var = connection.exec_driver_sql(sql)
3704 row = self._compat_first(show_var, charset=charset)
3705 if not row:
3706 return None
3707 else:
3708 return cast(Optional[str], row[fetch_col])
3709
3710 def _detect_charset(self, connection: Connection) -> str:
3711 raise NotImplementedError()
3712
3713 def _detect_casing(self, connection: Connection) -> int:
3714 """Sniff out identifier case sensitivity.
3715
3716 Cached per-connection. This value can not change without a server
3717 restart.
3718
3719 """
3720 # https://dev.mysql.com/doc/refman/en/identifier-case-sensitivity.html
3721
3722 setting = self._fetch_setting(connection, "lower_case_table_names")
3723 if setting is None:
3724 cs = 0
3725 else:
3726 # 4.0.15 returns OFF or ON according to [ticket:489]
3727 # 3.23 doesn't, 4.0.27 doesn't..
3728 if setting == "OFF":
3729 cs = 0
3730 elif setting == "ON":
3731 cs = 1
3732 else:
3733 cs = int(setting)
3734 self._casing = cs
3735 return cs
3736
3737 def _detect_collations(self, connection: Connection) -> Dict[str, str]:
3738 """Pull the active COLLATIONS list from the server.
3739
3740 Cached per-connection.
3741 """
3742
3743 collations = {}
3744 charset = self._connection_charset
3745 rs = connection.exec_driver_sql("SHOW COLLATION")
3746 for row in self._compat_fetchall(rs, charset):
3747 collations[row[0]] = row[1]
3748 return collations
3749
3750 def _detect_sql_mode(self, connection: Connection) -> None:
3751 setting = self._fetch_setting(connection, "sql_mode")
3752
3753 if setting is None:
3754 util.warn(
3755 "Could not retrieve SQL_MODE; please ensure the "
3756 "MySQL user has permissions to SHOW VARIABLES"
3757 )
3758 self._sql_mode = ""
3759 else:
3760 self._sql_mode = setting or ""
3761
3762 def _detect_ansiquotes(self, connection: Connection) -> None:
3763 """Detect and adjust for the ANSI_QUOTES sql mode."""
3764
3765 mode = self._sql_mode
3766 if not mode:
3767 mode = ""
3768 elif mode.isdigit():
3769 mode_no = int(mode)
3770 mode = (mode_no | 4 == mode_no) and "ANSI_QUOTES" or ""
3771
3772 self._server_ansiquotes = "ANSI_QUOTES" in mode
3773
3774 # as of MySQL 5.0.1
3775 self._backslash_escapes = "NO_BACKSLASH_ESCAPES" not in mode
3776
3777 @overload
3778 def _show_create_table(
3779 self,
3780 connection: Connection,
3781 table: Optional[Table],
3782 charset: Optional[str],
3783 full_name: str,
3784 ) -> str: ...
3785
3786 @overload
3787 def _show_create_table(
3788 self,
3789 connection: Connection,
3790 table: Table,
3791 charset: Optional[str] = None,
3792 full_name: None = None,
3793 ) -> str: ...
3794
3795 def _show_create_table(
3796 self,
3797 connection: Connection,
3798 table: Optional[Table],
3799 charset: Optional[str] = None,
3800 full_name: Optional[str] = None,
3801 ) -> str:
3802 """Run SHOW CREATE TABLE for a ``Table``."""
3803
3804 if full_name is None:
3805 assert table is not None
3806 full_name = self.identifier_preparer.format_table(table)
3807 st = "SHOW CREATE TABLE %s" % full_name
3808
3809 try:
3810 rp = connection.execution_options(
3811 skip_user_error_events=True
3812 ).exec_driver_sql(st)
3813 except exc.DBAPIError as e:
3814 if self._extract_error_code(e.orig) == 1146: # type: ignore[arg-type] # noqa: E501
3815 raise exc.NoSuchTableError(full_name) from e
3816 else:
3817 raise
3818 row = self._compat_first(rp, charset=charset)
3819 if not row:
3820 raise exc.NoSuchTableError(full_name)
3821 return cast(str, row[1]).strip()
3822
3823 @overload
3824 def _describe_table(
3825 self,
3826 connection: Connection,
3827 table: Optional[Table],
3828 charset: Optional[str],
3829 full_name: str,
3830 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]: ...
3831
3832 @overload
3833 def _describe_table(
3834 self,
3835 connection: Connection,
3836 table: Table,
3837 charset: Optional[str] = None,
3838 full_name: None = None,
3839 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]: ...
3840
3841 def _describe_table(
3842 self,
3843 connection: Connection,
3844 table: Optional[Table],
3845 charset: Optional[str] = None,
3846 full_name: Optional[str] = None,
3847 ) -> Union[Sequence[Row[Any]], Sequence[_DecodingRow]]:
3848 """Run DESCRIBE for a ``Table`` and return processed rows."""
3849
3850 if full_name is None:
3851 assert table is not None
3852 full_name = self.identifier_preparer.format_table(table)
3853 st = "DESCRIBE %s" % full_name
3854
3855 rp, rows = None, None
3856 try:
3857 try:
3858 rp = connection.execution_options(
3859 skip_user_error_events=True
3860 ).exec_driver_sql(st)
3861 except exc.DBAPIError as e:
3862 code = self._extract_error_code(e.orig) # type: ignore[arg-type] # noqa: E501
3863 if code == 1146:
3864 raise exc.NoSuchTableError(full_name) from e
3865
3866 elif code == 1356:
3867 raise exc.UnreflectableTableError(
3868 "Table or view named %s could not be "
3869 "reflected: %s" % (full_name, e)
3870 ) from e
3871
3872 else:
3873 raise
3874 rows = self._compat_fetchall(rp, charset=charset)
3875 finally:
3876 if rp:
3877 rp.close()
3878 return rows
3879
3880
3881class _DecodingRow:
3882 """Return unicode-decoded values based on type inspection.
3883
3884 Smooth over data type issues (esp. with alpha driver versions) and
3885 normalize strings as Unicode regardless of user-configured driver
3886 encoding settings.
3887
3888 """
3889
3890 # Some MySQL-python versions can return some columns as
3891 # sets.Set(['value']) (seriously) but thankfully that doesn't
3892 # seem to come up in DDL queries.
3893
3894 _encoding_compat: Dict[str, str] = {
3895 "koi8r": "koi8_r",
3896 "koi8u": "koi8_u",
3897 "utf16": "utf-16-be", # MySQL's uft16 is always bigendian
3898 "utf8mb4": "utf8", # real utf8
3899 "utf8mb3": "utf8", # real utf8; saw this happen on CI but I cannot
3900 # reproduce, possibly mariadb10.6 related
3901 "eucjpms": "ujis",
3902 }
3903
3904 def __init__(self, rowproxy: Row[Any], charset: Optional[str]):
3905 self.rowproxy = rowproxy
3906 self.charset = (
3907 self._encoding_compat.get(charset, charset)
3908 if charset is not None
3909 else None
3910 )
3911
3912 def __getitem__(self, index: int) -> Any:
3913 item = self.rowproxy[index]
3914 if self.charset and isinstance(item, bytes):
3915 return item.decode(self.charset)
3916 else:
3917 return item
3918
3919 def __getattr__(self, attr: str) -> Any:
3920 item = getattr(self.rowproxy, attr)
3921 if self.charset and isinstance(item, bytes):
3922 return item.decode(self.charset)
3923 else:
3924 return item
3925
3926
3927_info_columns = sql.table(
3928 "columns",
3929 sql.column("table_schema", VARCHAR(64)),
3930 sql.column("table_name", VARCHAR(64)),
3931 sql.column("column_name", VARCHAR(64)),
3932 schema="information_schema",
3933)