JdbcTableSchemaHistory.java

/*-
 * ========================LICENSE_START=================================
 * flyway-core
 * ========================================================================
 * Copyright (C) 2010 - 2025 Red Gate Software Ltd
 * ========================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * =========================LICENSE_END==================================
 */
package org.flywaydb.core.internal.schemahistory;

import lombok.CustomLog;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.api.MigrationPattern;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.api.output.CommandResultFactory;
import org.flywaydb.core.api.output.RepairResult;
import org.flywaydb.core.api.resolver.ResolvedMigration;
import org.flywaydb.core.extensibility.AppliedMigration;
import org.flywaydb.core.extensibility.MigrationType;
import org.flywaydb.core.internal.database.base.Connection;
import org.flywaydb.core.internal.database.base.Database;
import org.flywaydb.core.internal.database.base.Table;
import org.flywaydb.core.internal.exception.FlywaySqlException;
import org.flywaydb.core.internal.jdbc.ExecutionTemplateFactory;
import org.flywaydb.core.internal.jdbc.JdbcNullTypes;
import org.flywaydb.core.internal.jdbc.JdbcTemplate;
import org.flywaydb.core.internal.sqlscript.SqlScriptExecutorFactory;
import org.flywaydb.core.internal.sqlscript.SqlScriptFactory;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.Callable;
import org.flywaydb.core.internal.util.Pair;
import org.flywaydb.core.internal.util.StringUtils;

/**
 * Supports reading and writing to the schema history table.
 */
@CustomLog
class JdbcTableSchemaHistory extends SchemaHistory {
    private final SqlScriptExecutorFactory sqlScriptExecutorFactory;
    private final SqlScriptFactory sqlScriptFactory;

    /**
     * The database to use.
     */
    private final Database database;

    /**
     * Connection with access to the database.
     */
    private final Connection<?> connection;

    private final JdbcTemplate jdbcTemplate;

    /**
     * Applied migration cache.
     */
    private final LinkedList<AppliedMigration> cache = new LinkedList<>();

    private final Configuration configuration;

    /**
     * Creates a new instance of the schema history table support.
     *
     * @param database The database to use.
     * @param table    The schema history table used by Flyway.
     */
    JdbcTableSchemaHistory(SqlScriptExecutorFactory sqlScriptExecutorFactory, SqlScriptFactory sqlScriptFactory,
                           Database database, Table table, Configuration configuration) {
        this.sqlScriptExecutorFactory = sqlScriptExecutorFactory;
        this.sqlScriptFactory = sqlScriptFactory;
        this.table = table;
        this.database = database;
        this.connection = database.getMainConnection();
        this.jdbcTemplate = connection.getJdbcTemplate();
        this.configuration = configuration;
    }

    @Override
    public void clearCache() {
        cache.clear();
    }

    @Override
    public boolean exists() {
        connection.restoreOriginalState();

        return table.exists();
    }

    @Override
    public void create(final boolean baseline) {
        connection.lock(table, new Callable<Object>() {
            @Override
            public Object call() {
                int retries = 0;
                while (!exists()) {
                    if (retries == 0) {
                        LOG.info("Creating Schema History table " + table + (baseline ? " with baseline" : "") + " ...");
                    }
                    try {
                        ExecutionTemplateFactory.createExecutionTemplate(connection.getJdbcConnection(),
                                                                         database).execute(new Callable<Object>() {
                            @Override
                            public Object call() {
                                sqlScriptExecutorFactory.createSqlScriptExecutor(connection.getJdbcConnection(), false, false, true)
                                                        .execute(database.getCreateScript(sqlScriptFactory, table, baseline), database.getConfiguration());
                                LOG.debug("Created Schema History table " + table + (baseline ? " with baseline" : ""));
                                return null;
                            }
                        });
                    } catch (FlywayException e) {
                        if (++retries >= 10) {
                            throw e;
                        }
                        try {
                            LOG.debug("Schema History table creation failed. Retrying in 1 sec ...");
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            // Ignore
                        }
                    }
                }
                return null;
            }
        });
    }

    @Override
    public void drop() {
        if (!exists()) {
            LOG.info("Dropping Schema History table " + table + " not necessary as table does not exist");
            return;
        }

        LOG.info("Dropping Schema History table " + table);
        connection.lock(table, () -> {
            table.drop();
            return null;
        });
        clearCache();
    }

    @Override
    public <T> T lock(Callable<T> callable) {
        connection.restoreOriginalState();

        return connection.lock(table, callable);
    }

    @Override
    protected void doAddAppliedMigration(int installedRank, MigrationVersion version, String description,
                                         MigrationType type, String script, Integer checksum,
                                         int executionTime, boolean success) {
        boolean tableIsLocked = false;
        connection.restoreOriginalState();

        // Lock again for databases with no clean DDL transactions like Oracle
        // to prevent implicit commits from triggering deadlocks
        // in highly concurrent environments
        if (!database.supportsDdlTransactions()) {
            table.lock();
            tableIsLocked = true;
        }

        try {
            String versionStr = version == null ? null : version.toString();

            if (!database.supportsEmptyMigrationDescription() && "".equals(description)) {
                description = NO_DESCRIPTION_MARKER;
            }

            Object versionObj = versionStr == null ? JdbcNullTypes.StringNull : versionStr;
            Object checksumObj = checksum == null ? JdbcNullTypes.IntegerNull : checksum;

            jdbcTemplate.update(database.getInsertStatement(table),
                                installedRank, versionObj, description, type.name(), script, checksumObj, database.getInstalledBy(),
                                executionTime, success);

            LOG.debug("Schema History table " + table + " successfully updated to reflect changes");
        } catch (SQLException e) {
            throw new FlywaySqlException("Unable to insert row for version '" + version + "' in Schema History table " + table, e);
        } finally {
            if (tableIsLocked) {
                table.unlock();
            }
        }
    }

    @Override
    public List<AppliedMigration> allAppliedMigrations() {
        if (!exists()) {
            LOG.info(String.format("Schema history table %s does not exist yet", table.toString()));
            return new ArrayList<>();
        }

        refreshCache();
        return cache;
    }

    private void refreshCache() {
        int maxCachedInstalledRank = cache.isEmpty() ? -1 : cache.getLast().getInstalledRank();
        String query = database.getSelectStatement(table);

        try {
            cache.addAll(jdbcTemplate.query(query, rs -> {
                // Construct a map of lower-cased column names to ordinals. This is useful for databases that
                // upper-case them - e.g. Snowflake with QUOTED-IDENTIFIERS-IGNORE-CASE turned on
                HashMap<String, Integer> columnOrdinalMap = constructColumnOrdinalMap(rs);

                Integer checksum = null;
                try {
                    checksum = rs.getInt(columnOrdinalMap.get("checksum"));
                } catch (NumberFormatException ignore) {
                }
                if (rs.wasNull()) {
                    checksum = null;
                }

                int installedRank = rs.getInt(columnOrdinalMap.get("installed_rank"));
                MigrationVersion version = rs.getString(columnOrdinalMap.get("version")) != null ? MigrationVersion.fromVersion(rs.getString(columnOrdinalMap.get("version"))) : null;
                String description = rs.getString(columnOrdinalMap.get("description"));
                String type = rs.getString(columnOrdinalMap.get("type"));
                String script = rs.getString(columnOrdinalMap.get("script"));
                String installedBy = rs.getString(columnOrdinalMap.get("installed_by"));
                int executionTime = rs.getInt(columnOrdinalMap.get("execution_time"));
                boolean success = rs.getBoolean(columnOrdinalMap.get("success"));
                Timestamp installedOn = rs.getTimestamp(columnOrdinalMap.get("installed_on"));
                if (installedOn == null) {
                    String installedOnStr = rs.getString(columnOrdinalMap.get("installed_on"));
                    if (StringUtils.hasText(installedOnStr)) {
                        try {
                            installedOn = Timestamp.valueOf(installedOnStr);
                        } catch (IllegalArgumentException e) {
                            // do nothing
                        }
                    }
                }

                return configuration.getPluginRegister().getPlugins(AppliedMigration.class).stream()
                                    .filter(am -> am.handlesType(type))
                                    .findFirst()
                                    .orElse(new BaseAppliedMigration())
                                    .create(installedRank, version, description, type, script, checksum, installedOn, installedBy, executionTime, success);
            }, maxCachedInstalledRank));
        } catch (SQLException e) {
            throw new FlywaySqlException("Error while retrieving the list of applied migrations from Schema History table " + table, e);
        }
    }

    private HashMap<String, Integer> constructColumnOrdinalMap(ResultSet rs) throws SQLException {
        HashMap<String, Integer> columnOrdinalMap = new HashMap<>();
        ResultSetMetaData metadata = rs.getMetaData();

        for (int i = 1; i <= metadata.getColumnCount(); i++) {
            // Careful - column ordinals in JDBC start at 1
            String columnNameLower = metadata.getColumnName(i).toLowerCase();
            columnOrdinalMap.put(columnNameLower, i);
        }

        return columnOrdinalMap;
    }

    @Override
    public boolean removeFailedMigrations(final RepairResult repairResult, final MigrationPattern[] migrationPatternFilter) {
        if (!exists()) {
            LOG.info("Repair of failed migration in Schema History table " + table + " not necessary as table doesn't exist.");
            return false;
        }

        final List<AppliedMigration> failedAppliedMigrations = filterMigrations(allAppliedMigrations(), migrationPatternFilter)
            .stream()
            .filter(fam -> !fam.isSuccess())
            .toList();

        if (failedAppliedMigrations.isEmpty()) {
            LOG.info("Repair of failed migration in Schema History table " + table + " not necessary. No failed migration detected.");
            return false;
        }

        try {
            repairResult.migrationsRemoved = failedAppliedMigrations.stream().map(CommandResultFactory::createRepairOutput).toList();

            for (final AppliedMigration appliedMigration : failedAppliedMigrations) {
                final Pair<String, Object> deleteStatement;
                if (appliedMigration.getVersion() != null) {
                    deleteStatement = database.getDeleteStatement(table, true, appliedMigration.getVersion().getVersion());
                } else {
                    deleteStatement = database.getDeleteStatement(table, false, appliedMigration.getDescription());
                }

                if (deleteStatement != null) {
                    jdbcTemplate.execute(deleteStatement.getLeft(), deleteStatement.getRight());
                }
            }

            clearCache();
        } catch (final SQLException e) {
            throw new FlywaySqlException("Unable to repair Schema History table " + table, e);
        }

        return true;
    }

    private List<AppliedMigration> filterMigrations(List<AppliedMigration> appliedMigrations, MigrationPattern[] migrationPatternFilter) {
        if (migrationPatternFilter == null) {
            return appliedMigrations;
        }

        Set<AppliedMigration> filteredList = new HashSet<>();

        for (AppliedMigration appliedMigration : appliedMigrations) {
            for (MigrationPattern migrationPattern : migrationPatternFilter) {
                if (migrationPattern.matches(appliedMigration.getVersion(), appliedMigration.getDescription())) {
                    filteredList.add(appliedMigration);
                }
            }
        }

        return new ArrayList<>(filteredList);
    }

    @Override
    public void update(AppliedMigration appliedMigration, ResolvedMigration resolvedMigration) {
        connection.restoreOriginalState();

        clearCache();

        MigrationVersion version = appliedMigration.getVersion();

        String description = resolvedMigration.getDescription();
        Integer checksum = resolvedMigration.getChecksum();
        MigrationType type = appliedMigration.getType().isSynthetic()
                ? appliedMigration.getType()
                : resolvedMigration.getType();

        LOG.info("Repairing Schema History table for version " + version
                         + " (Description: " + description + ", Type: " + type + ", Checksum: " + checksum + ")  ...");

        if (!database.supportsEmptyMigrationDescription() && "".equals(description)) {
            description = NO_DESCRIPTION_MARKER;
        }

        Object checksumObj = checksum == null ? JdbcNullTypes.IntegerNull : checksum;

        try {
            jdbcTemplate.update(database.getUpdateStatement(table), description, type.name(), checksumObj, appliedMigration.getInstalledRank());
        } catch (SQLException e) {
            throw new FlywaySqlException("Unable to repair Schema History table " + table
                                                 + " for version " + version, e);
        }
    }

    @Override
    public void delete(AppliedMigration appliedMigration) {
        connection.restoreOriginalState();

        clearCache();

        MigrationVersion version = appliedMigration.getVersion();
        String versionStr = version == null ? null : version.toString();

        if (version == null) {
            LOG.info("Repairing Schema History table for description \"" + appliedMigration.getDescription() + "\" (Marking as DELETED)  ...");
        } else {
            LOG.info("Repairing Schema History table for version \"" + version + "\" (Marking as DELETED)  ...");
        }

        Object versionObj = versionStr == null ? JdbcNullTypes.StringNull : versionStr;
        Object checksumObj = appliedMigration.getChecksum() == null ? JdbcNullTypes.IntegerNull : appliedMigration.getChecksum();

        try {
            jdbcTemplate.update(
                    database.getInsertStatement(table),
                    calculateInstalledRank(appliedMigration.getType()),
                    versionObj, appliedMigration.getDescription(), "DELETE", appliedMigration.getScript(),
                    checksumObj, database.getInstalledBy(), 0, appliedMigration.isSuccess());
        } catch (SQLException e) {
            throw new FlywaySqlException("Unable to repair Schema History table " + table
                                                 + " for version " + version, e);
        }
    }
}