JdbcMigrator.java

/*-
 * ========================LICENSE_START=================================
 * flyway-verb-migrate
 * ========================================================================
 * Copyright (C) 2010 - 2025 Red Gate Software Ltd
 * ========================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * =========================LICENSE_END==================================
 */
package org.flywaydb.verb.migrate.migrators;

import static org.flywaydb.nc.utils.VerbUtils.toMigrationText;

import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import lombok.CustomLog;
import org.flywaydb.core.ProgressLogger;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.api.LoadableMigrationInfo;
import org.flywaydb.core.api.MigrationInfo;
import org.flywaydb.core.api.MigrationState;
import org.flywaydb.core.api.callback.Event;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.api.output.CommandResultFactory;
import org.flywaydb.core.api.output.MigrateResult;
import org.flywaydb.core.api.resource.LoadableResource;
import org.flywaydb.core.internal.nc.ConnectionType;
import org.flywaydb.core.internal.nc.NativeConnectorsDatabase;
import org.flywaydb.core.internal.exception.FlywayMigrateException;
import org.flywaydb.core.internal.parser.Parser;
import org.flywaydb.core.internal.parser.ParsingContext;
import org.flywaydb.core.internal.sqlscript.SqlScriptMetadata;
import org.flywaydb.core.internal.sqlscript.SqlStatement;
import org.flywaydb.core.internal.sqlscript.SqlStatementIterator;
import org.flywaydb.core.internal.util.ExceptionUtils;
import org.flywaydb.core.internal.util.Pair;
import org.flywaydb.core.internal.util.StopWatch;
import org.flywaydb.core.internal.util.StringUtils;
import org.flywaydb.nc.callbacks.CallbackManager;
import org.flywaydb.nc.utils.ErrorUtils;
import org.flywaydb.core.internal.nc.Executor;
import org.flywaydb.nc.executors.ExecutorFactory;
import org.flywaydb.verb.migrate.MigrationExecutionGroup;
import org.flywaydb.core.internal.nc.Reader;
import org.flywaydb.nc.readers.ReaderFactory;

@CustomLog
public class JdbcMigrator extends Migrator {

    @Override
    public List<MigrationExecutionGroup> createGroups(final MigrationInfo[] allPendingMigrations,
        final Configuration configuration,
        final NativeConnectorsDatabase experimentalDatabase,
        final MigrateResult migrateResult,
        final ParsingContext parsingContext) {
        if (experimentalDatabase.getDatabaseMetaData().connectionType() != ConnectionType.JDBC) {
            return List.of(new MigrationExecutionGroup(List.of(allPendingMigrations), true));
        }
        final List<MigrationInfo> currentGroup = Arrays.asList(allPendingMigrations);
        final List<Pair<MigrationInfo, Boolean>> migrationTransactionPairs = currentGroup.stream()
            .map(x -> Pair.of(x, shouldExecuteInTransaction(x, configuration, experimentalDatabase, parsingContext)))
            .toList();
        if (!configuration.isGroup()) {
            return migrationTransactionPairs.stream()
                .map(x -> new MigrationExecutionGroup(List.of(x.getLeft()), x.getRight()))
                .toList();
        }

        for (final Pair<MigrationInfo, Boolean> pair : migrationTransactionPairs) {
            final MigrationInfo migrationInfo = pair.getLeft();
            final boolean shouldExecuteMigrationInTransaction = pair.getRight();
            if (configuration.isExecuteInTransaction() != shouldExecuteMigrationInTransaction) {
                if (configuration.isMixed()) {
                    return migrationTransactionPairs.stream()
                        .map(x -> new MigrationExecutionGroup(List.of(x.getLeft()), x.getRight()))
                        .toList();
                } else {
                    throw new FlywayMigrateException(migrationInfo,
                        "Detected both transactional and non-transactional migrations within the same migration group"
                            + " (even though mixed is false). First offending migration: "
                            + experimentalDatabase.doQuote((migrationInfo.isVersioned() ? migrationInfo.getVersion()
                            .getVersion() : "") + (StringUtils.hasLength(migrationInfo.getDescription()) ? " "
                            + migrationInfo.getDescription() : ""))
                            + (shouldExecuteMigrationInTransaction ? "" : " [non-transactional]"),
                        shouldExecuteMigrationInTransaction,
                        migrateResult);
                }
            }
        }
        if (!configuration.isExecuteInTransaction()) {
            return Arrays.stream(allPendingMigrations)
                .map(x -> new MigrationExecutionGroup(List.of(x), false))
                .toList();
        }

        final List<Pair<MigrationInfo, Boolean>> migrationContainsNonTransactionalStatements = currentGroup.stream()
            .map(x -> Pair.of(x,
                containsNonTransactionalStatements(configuration, experimentalDatabase, x, parsingContext)))
            .toList();
        for (final Pair<MigrationInfo, Boolean> pair : migrationContainsNonTransactionalStatements) {
            final MigrationInfo migrationInfo = pair.getLeft();
            if (migrationInfo instanceof final LoadableMigrationInfo loadableMigrationInfo) {
                final boolean containsNonTransactionalStatements = containsNonTransactionalStatements(configuration,
                    experimentalDatabase,
                    loadableMigrationInfo,
                    parsingContext);
                if (containsNonTransactionalStatements) {
                    if (configuration.isMixed()) {
                        return Arrays.stream(allPendingMigrations)
                            .map(x -> new MigrationExecutionGroup(List.of(x), pair.getRight()))
                            .toList();
                    }
                    throw new FlywayMigrateException(migrationInfo,
                        "Detected both transactional and non-transactional migrations within the same migration group"
                            + " (even though mixed is false). First offending migration: "
                            + experimentalDatabase.doQuote((migrationInfo.isVersioned() ? migrationInfo.getVersion()
                            .getVersion() : "") + (StringUtils.hasLength(migrationInfo.getDescription()) ? " "
                            + migrationInfo.getDescription() : ""))
                            + (" [non-transactional]"),
                        false,
                        migrateResult);
                }
            }
        }

        return List.of(new MigrationExecutionGroup(currentGroup, true));
    }

    @Override
    public int doExecutionGroup(final Configuration configuration,
        final MigrationExecutionGroup executionGroup,
        final NativeConnectorsDatabase experimentalDatabase,
        final MigrateResult migrateResult,
        final ParsingContext parsingContext,
        final int installedRank,
        final CallbackManager callbackManager,
        final ProgressLogger progress) {

        int rank = installedRank;
        final boolean executeInTransaction = executionGroup.shouldExecuteInTransaction();
        if (executeInTransaction) {
            experimentalDatabase.startTransaction();
        }
        for (final MigrationInfo migrationInfo : executionGroup.migrations()) {
            if (!configuration.isMixed() && configuration.isExecuteInTransaction()) {
                validateMixedStatements(configuration, experimentalDatabase, migrationInfo, parsingContext);
            }
            doIndividualMigration(migrationInfo,
                experimentalDatabase,
                configuration,
                migrateResult,
                parsingContext,
                callbackManager,
                rank,
                executeInTransaction,
                progress);
            rank++;
        }
        if (executeInTransaction) {
            experimentalDatabase.commitTransaction();
        }
        return rank;
    }

    private void doIndividualMigration(final MigrationInfo migrationInfo,
        final NativeConnectorsDatabase experimentalDatabase,
        final Configuration configuration,
        final MigrateResult migrateResult,
        final ParsingContext parsingContext,
        final CallbackManager callbackManager,
        final int installedRank,
        final boolean executeInTransaction,
        final ProgressLogger progress) {
        final StopWatch watch = new StopWatch();
        watch.start();

        final AtomicReference<SqlStatement> sqlStatement = new AtomicReference<>();
        final boolean outOfOrder = migrationInfo.getState() == MigrationState.OUT_OF_ORDER
            && configuration.isOutOfOrder();
        final String migrationText = toMigrationText(migrationInfo,
            executeInTransaction,
            experimentalDatabase,
            outOfOrder);
        final Executor<SqlStatement> executor = ExecutorFactory.getExecutor(experimentalDatabase, configuration);
        final Reader<SqlStatement> reader = ReaderFactory.getReader(experimentalDatabase, configuration);

        try {
            if (configuration.isSkipExecutingMigrations()) {
                LOG.debug("Skipping execution of migration of " + migrationText);
                progress.log("Skipping migration of " + migrationInfo.getScript());
            } else {
                LOG.debug("Starting migration of " + migrationText + " ...");
                progress.log("Starting migration of " + migrationInfo.getScript() + " ...");
                if (!migrationInfo.getType().isUndo()) {
                    callbackManager.handleEvent(Event.BEFORE_EACH_MIGRATE,
                        experimentalDatabase,
                        configuration,
                        parsingContext);
                }
                if (!migrationInfo.getType().isUndo()) {
                    LOG.info("Migrating " + migrationText);
                    progress.log("Migrating " + migrationInfo.getScript());
                } else {
                    LOG.info("Undoing migration of " + migrationText);
                }

                if (migrationInfo instanceof final LoadableMigrationInfo loadableMigrationInfo) {
                    final Stream<SqlStatement> executionUnits = reader.read(configuration,
                        experimentalDatabase,
                        parsingContext,
                        loadableMigrationInfo.getLoadableResource(),
                        loadableMigrationInfo.getSqlScriptMetadata());

                    executionUnits.forEach(x -> {
                        sqlStatement.set(x);
                        executor.execute(experimentalDatabase, x, configuration);
                    });
                    executor.finishExecution(experimentalDatabase, configuration);
                }

                if (!migrationInfo.getType().isUndo()) {
                    callbackManager.handleEvent(Event.AFTER_EACH_MIGRATE,
                        experimentalDatabase,
                        configuration,
                        parsingContext);
                }
            }
        } catch (final FlywayException e) {
            watch.stop();
            final int totalTimeMillis = (int) watch.getTotalTimeMillis();
            handleMigrationError(e,
                experimentalDatabase,
                migrationInfo,
                executor,
                sqlStatement.get(),
                migrateResult,
                configuration.getTable(),
                configuration.isOutOfOrder(),
                installedRank,
                experimentalDatabase.getInstalledBy(configuration),
                executeInTransaction,
                totalTimeMillis,
                configuration.getCurrentEnvironmentName());
        }

        watch.stop();

        progress.log("Successfully completed migration of " + migrationInfo.getScript());
        migrateResult.migrationsExecuted += 1;
        final int totalTimeMillis = (int) watch.getTotalTimeMillis();
        migrateResult.putSuccessfulMigration(migrationInfo, totalTimeMillis);
        if (migrationInfo.isVersioned()) {
            migrateResult.targetSchemaVersion = migrationInfo.getVersion().getVersion();
        }
        migrateResult.migrations.add(CommandResultFactory.createMigrateOutput(migrationInfo, totalTimeMillis, null));
        updateSchemaHistoryTable(configuration.getTable(),
            migrationInfo,
            totalTimeMillis,
            installedRank,
            experimentalDatabase,
            experimentalDatabase.getInstalledBy(configuration),
            true);
    }

    private boolean containsNonTransactionalStatements(final Configuration configuration,
        final NativeConnectorsDatabase experimentalDatabase,
        final MigrationInfo migrationInfo,
        final ParsingContext parsingContext) {
        if (migrationInfo instanceof final LoadableMigrationInfo loadableMigrationInfo) {
            try (final SqlStatementIterator sqlStatementIterator = getSqlStatementIterator(experimentalDatabase,
                configuration,
                loadableMigrationInfo,
                parsingContext)) {
                while (sqlStatementIterator.hasNext()) {
                    final SqlStatement sqlStatement = sqlStatementIterator.next();
                    if (!sqlStatement.canExecuteInTransaction()) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    private boolean shouldExecuteInTransaction(final MigrationInfo migrationInfo,
        final Configuration configuration,
        final NativeConnectorsDatabase experimentalDatabase,
        final ParsingContext parsingContext) {
        if (migrationInfo instanceof final LoadableMigrationInfo loadableMigrationInfo) {
            if (loadableMigrationInfo.getSqlScriptMetadata() != null
                && loadableMigrationInfo.getSqlScriptMetadata().executeInTransaction() != null) {
                return loadableMigrationInfo.getSqlScriptMetadata().executeInTransaction();
            }
        }
        return configuration.isExecuteInTransaction() && !containsNonTransactionalStatements(configuration,
            experimentalDatabase,
            migrationInfo,
            parsingContext);
    }

    private SqlStatementIterator getSqlStatementIterator(final NativeConnectorsDatabase experimentalDatabase,
        final Configuration configuration,
        final LoadableMigrationInfo loadableMigrationInfo,
        final ParsingContext parsingContext) {
        final Parser parser = (Parser) experimentalDatabase.getParser().apply(configuration, parsingContext);
        final SqlScriptMetadata metadata = loadableMigrationInfo.getSqlScriptMetadata();
        return parser.parse(loadableMigrationInfo.getLoadableResource(), metadata);
    }

    private void handleMigrationError(final FlywayException e,
        final NativeConnectorsDatabase experimentalDatabase,
        final MigrationInfo migrationInfo,
        final Executor<SqlStatement> executor,
        final SqlStatement sqlStatement,
        final MigrateResult migrateResult,
        final String schemaHistoryTableName,
        final boolean outOfOrder,
        final int installedRank,
        final String installedBy,
        final boolean executeInTransaction,
        final int totalTimeMillis,
        final String environment) {

        final String migrationText = toMigrationText(migrationInfo,
            executeInTransaction,
            experimentalDatabase,
            outOfOrder);
        final String failedMsg;
        if (!migrationInfo.getType().isUndo()) {
            failedMsg = "Migration of " + migrationText + " failed!";
        } else {
            failedMsg = "Undo of migration of " + migrationText + " failed!";
        }

        migrateResult.putFailedMigration(migrationInfo, totalTimeMillis);
        migrateResult.setSuccess(false);
        if (executeInTransaction) {
            experimentalDatabase.rollbackTransaction();
        }
        if (experimentalDatabase.supportsTransactions() && executeInTransaction) {
            LOG.error(failedMsg + " Changes successfully rolled back.");
        } else {
            LOG.error(failedMsg + " Please restore backups and roll back database and code!");
            updateSchemaHistoryTable(schemaHistoryTableName,
                migrationInfo,
                totalTimeMillis,
                installedRank,
                experimentalDatabase,
                installedBy,
                false);
        }
        if (sqlStatement == null) {
            throw new FlywayMigrateException(migrationInfo, e.getMessage(), executeInTransaction, migrateResult);
        } else {
            final String message = calculateErrorMessage(e, migrationInfo, executor, sqlStatement, environment);
            throw new FlywayMigrateException(migrationInfo,
                outOfOrder,
                message,
                e,
                executeInTransaction,
                migrateResult,
                sqlStatement);
        }
    }

    private String calculateErrorMessage(final Exception e,
        final MigrationInfo migrationInfo,
        final Executor<SqlStatement> executor,
        final SqlStatement sqlStatement,
        final String environment) {

        final String title = ErrorUtils.getScriptExecutionErrorMessageTitle(Paths.get(migrationInfo.getScript()).getFileName(), environment);

        String message = null;
        if (e.getCause() instanceof final SQLException sqlException) {
            message = ExceptionUtils.toMessage(sqlException);
        }

        LoadableResource loadableResource = null;
        if (migrationInfo instanceof final LoadableMigrationInfo loadableMigrationInfo) {
            loadableResource = loadableMigrationInfo.getLoadableResource();
        }

        return ErrorUtils.calculateErrorMessage(title,
            loadableResource,
            migrationInfo.getPhysicalLocation(),
            executor,
            sqlStatement,
            message);
    }

    private void validateMixedStatements(final Configuration configuration,
        final NativeConnectorsDatabase experimentalDatabase,
        final MigrationInfo migrationInfo,
        final ParsingContext parsingContext) {
        if (migrationInfo instanceof final LoadableMigrationInfo loadableMigrationInfo) {
            try (final SqlStatementIterator sqlStatementIterator = getSqlStatementIterator(experimentalDatabase,
                configuration,
                loadableMigrationInfo,
                parsingContext)) {
                boolean haveFoundNonTransactionalStatements = false;
                boolean haveFoundTransactionalStatements = false;
                while (sqlStatementIterator.hasNext()) {
                    final SqlStatement sqlStatement = sqlStatementIterator.next();
                    if (sqlStatement.canExecuteInTransaction()) {
                        haveFoundTransactionalStatements = true;
                    } else {
                        haveFoundNonTransactionalStatements = true;
                    }
                    if (haveFoundTransactionalStatements && haveFoundNonTransactionalStatements) {
                        throw new FlywayException(
                            "Detected both transactional and non-transactional statements within the same migration"
                                + " (even though mixed is false). Offending statement found at line "
                                + sqlStatement.getLineNumber()
                                + ": "
                                + sqlStatement.getSql()
                                + (sqlStatement.canExecuteInTransaction() ? "" : " [non-transactional]"));
                    }
                }
            }
        }
    }
}