DefaultSqlScriptExecutor.java

/*-
 * ========================LICENSE_START=================================
 * flyway-core
 * ========================================================================
 * Copyright (C) 2010 - 2025 Red Gate Software Ltd
 * ========================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * =========================LICENSE_END==================================
 */
package org.flywaydb.core.internal.sqlscript;

import lombok.CustomLog;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.api.callback.Error;
import org.flywaydb.core.api.callback.Event;
import org.flywaydb.core.api.callback.Warning;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.api.exception.FlywayBlockStatementExecutionException;
import org.flywaydb.core.internal.callback.CallbackExecutor;
import org.flywaydb.core.internal.jdbc.JdbcTemplate;
import org.flywaydb.core.internal.jdbc.Result;
import org.flywaydb.core.internal.jdbc.Results;
import org.flywaydb.core.internal.jdbc.StatementInterceptor;
import org.flywaydb.core.internal.util.AsciiTable;
import org.flywaydb.core.internal.util.StringUtils;

import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@CustomLog
public class DefaultSqlScriptExecutor implements SqlScriptExecutor {
    protected final JdbcTemplate jdbcTemplate;

    /**
     * The callback executor.
     */
    private final CallbackExecutor callbackExecutor;













    /**
     * The maximum number of statements to include in a batch.
     */
    private static final int MAX_BATCH_SIZE = 100;

    /**
     * Whether to batch SQL statements.
     */
    private final boolean batch;

    /**
     * Whether to output query results table.
     */
    protected final boolean outputQueryResults;



    public DefaultSqlScriptExecutor(JdbcTemplate jdbcTemplate,
                                    CallbackExecutor callbackExecutor, boolean undo, boolean batch, boolean outputQueryResults,
                                    StatementInterceptor statementInterceptor
                                   ) {
        this.jdbcTemplate = jdbcTemplate;




        this.callbackExecutor = callbackExecutor;
        this.outputQueryResults = outputQueryResults;
        this.batch = batch;
    }

    @Override
    public List<Results> execute(SqlScript sqlScript, Configuration config) {
        final List<Results> results = new ArrayList<>(List.of());






        List<SqlStatement> batchStatements = new ArrayList<>();

        try (SqlStatementIterator sqlStatementIterator = sqlScript.getSqlStatements()) {
            SqlStatement sqlStatement;
            while ((sqlStatement = sqlStatementIterator.next()) != null) {













                if (batch) {
                    if (sqlStatement.isBatchable()) {
                        logStatementExecution(sqlStatement);
                        batchStatements.add(sqlStatement);
                        if (batchStatements.size() >= MAX_BATCH_SIZE) {
                            results.add(executeBatch(jdbcTemplate, sqlScript, batchStatements, config));
                            batchStatements = new ArrayList<>();
                        }
                    } else {
                        // Execute the batch up to this point
                        results.add(executeBatch(jdbcTemplate, sqlScript, batchStatements, config));
                        batchStatements = new ArrayList<>();
                        // Now execute this non-batchable statement. We'll resume batching after this one.
                        results.add(executeStatement(jdbcTemplate, sqlScript, sqlStatement, config));
                    }
                } else {
                    results.add(executeStatement(jdbcTemplate, sqlScript, sqlStatement, config));
                }
            }
        }

        if (batch) {
            // Execute any remaining batch statements that haven't yet been sent to the database
            results.add(executeBatch(jdbcTemplate, sqlScript, batchStatements, config));
        }
        return results;
    }

    protected void logStatementExecution(SqlStatement sqlStatement) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Executing "
                              + (batch && sqlStatement.isBatchable() ? "batchable " : "")
                              + "SQL: " + sqlStatement.getSql());
        }
    }

    private Results executeBatch(JdbcTemplate jdbcTemplate, SqlScript sqlScript, List<SqlStatement> batchStatements, Configuration config) {
        if (batchStatements.isEmpty()) {
            return null;
        }

        LOG.debug("Sending batch of " + batchStatements.size() + " statements to database ...");
        List<String> sqlBatch = new ArrayList<>();
        for (SqlStatement sqlStatement : batchStatements) {
            try {
                handleEachMigrateOrUndoStatementCallback(Event.BEFORE_EACH_UNDO_STATEMENT, Event.BEFORE_EACH_MIGRATE_STATEMENT, sqlStatement.getSql() + sqlStatement.getDelimiter(), null, null);
            } catch (FlywayBlockStatementExecutionException e) {
                LOG.debug("Statement on line " + sqlStatement.getLineNumber() + " + skipped due to " + e.getMessage());
                continue;
            }
            sqlBatch.add(sqlStatement.getSql());
        }

        Results results = jdbcTemplate.executeBatch(sqlBatch, config);

        if (results.getException() != null) {
            handleException(results, sqlScript, batchStatements.get(0), config);

            for (int i = 0; i < results.getResults().size(); i++) {
                SqlStatement sqlStatement = batchStatements.get(i);
                long updateCount = results.getResults().get(i).updateCount();
                if (updateCount == Statement.EXECUTE_FAILED) {
                    handleEachMigrateOrUndoStatementCallback(Event.AFTER_EACH_UNDO_STATEMENT_ERROR, Event.AFTER_EACH_MIGRATE_STATEMENT_ERROR, sqlStatement.getSql() + sqlStatement.getDelimiter(), results.getWarnings(), results.getErrors());
                    handleException(results, sqlScript, batchStatements.get(i), config);
                } else if (updateCount != Statement.SUCCESS_NO_INFO) {
                    handleEachMigrateOrUndoStatementCallback(Event.AFTER_EACH_UNDO_STATEMENT, Event.AFTER_EACH_MIGRATE_STATEMENT, sqlStatement.getSql() + sqlStatement.getDelimiter(), results.getWarnings(), results.getErrors());
                    handleUpdateCount(updateCount);
                }
            }
            return results;
        }

        for (int i = 0; i < results.getResults().size(); i++) {
            SqlStatement sqlStatement = batchStatements.get(i);
            handleEachMigrateOrUndoStatementCallback(Event.AFTER_EACH_UNDO_STATEMENT, Event.AFTER_EACH_MIGRATE_STATEMENT, sqlStatement.getSql() + sqlStatement.getDelimiter(), results.getWarnings(), results.getErrors());
        }
        handleResults(results);
        return results;
    }

    protected Results executeStatement(JdbcTemplate jdbcTemplate, SqlScript sqlScript, SqlStatement sqlStatement, Configuration config) {
        logStatementExecution(sqlStatement);
        String sql = sqlStatement.getSql() + sqlStatement.getDelimiter();

        try {
            handleEachMigrateOrUndoStatementCallback(Event.BEFORE_EACH_UNDO_STATEMENT, Event.BEFORE_EACH_MIGRATE_STATEMENT, sql, null, null);
        } catch (FlywayBlockStatementExecutionException e) {
            LOG.debug("Statement on line " + sqlStatement.getLineNumber() + " + skipped due to " + e.getMessage());
            return null;
        }

        Results results = sqlStatement.execute(jdbcTemplate, this, config);

        if (results.getException() != null) {
            handleEachMigrateOrUndoStatementCallback(Event.AFTER_EACH_UNDO_STATEMENT_ERROR, Event.AFTER_EACH_MIGRATE_STATEMENT_ERROR, sql, results.getWarnings(), results.getErrors());
            printWarnings(results);
            handleException(results, sqlScript, sqlStatement, config);
            return null;
        }

        handleEachMigrateOrUndoStatementCallback(Event.AFTER_EACH_UNDO_STATEMENT, Event.AFTER_EACH_MIGRATE_STATEMENT, sql, results.getWarnings(), results.getErrors());
        printWarnings(results);
        handleResults(results);
        return results;
    }

    protected void handleResults(Results results) {
        for (Result result : results.getResults()) {
            long updateCount = result.updateCount();
            if (updateCount != -1) {
                handleUpdateCount(updateCount);
            }

            outputQueryResult(result);

        }
    }

    protected void outputQueryResult(Result result) {
        if (outputQueryResults &&
                result.columns() != null && !result.columns().isEmpty()) {
            LOG.info(new AsciiTable(result.columns(), result.data(),
                true, "", "No rows returned").render());
        }
    }

    private void handleUpdateCount(long updateCount) {
        LOG.debug(updateCount + " row" + StringUtils.pluralizeSuffix(updateCount) + " affected");
    }

    protected void handleException(Results results, SqlScript sqlScript, SqlStatement sqlStatement, Configuration config) {




                throw new FlywaySqlScriptException(sqlScript.getResource(), sqlStatement, results.getException(), config.getCurrentEnvironmentName());




    }

    private void printWarnings(Results results) {
        for (Warning warning : results.getWarnings()) {



                if ("00000".equals(warning.getState())) {
                    LOG.info("DB: " + warning.getMessage());
                } else {
                    LOG.warn("DB: " + warning.getMessage()
                                     + " (SQL State: " + warning.getState() + " - Error Code: " + warning.getCode() + ")");
                }



        }
    }

    private void handleEachMigrateOrUndoStatementCallback(Event eventUndo, Event eventMigrate, String sql, List<Warning> warnings, List<Error> errors) {







        callbackExecutor.onEachMigrateOrUndoStatementEvent(eventMigrate, sql, warnings, errors);
    }
}