InsertRowLock.java
/*-
* ========================LICENSE_START=================================
* flyway-core
* ========================================================================
* Copyright (C) 2010 - 2026 Red Gate Software Ltd
* ========================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/
package org.flywaydb.core.internal.database;
import lombok.CustomLog;
import org.flywaydb.core.internal.database.base.Table;
import org.flywaydb.core.internal.jdbc.JdbcTemplate;
import org.flywaydb.core.internal.jdbc.Results;
import java.math.BigInteger;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
/**
* Distributed locking mechanism using row insertion in the Flyway schema history table.
*
* <p>This class implements a database-based locking strategy to prevent multiple Flyway instances
* from simultaneously migrating the same database. The lock is implemented by inserting a special
* row into the schema history table with a fixed installed_rank value (-100). The primary key
* constraint on installed_rank ensures only one instance can hold the lock at a time.</p>
*
* <p>The lock is kept alive by a background thread that periodically updates the lock row's timestamp.
* Locks that haven't been updated within {@link #LOCK_TIMEOUT_MINS} are considered expired and can be
* cleaned up by other instances attempting to acquire the lock.</p>
*/
@CustomLog
public class InsertRowLock {
private static final Random random = new Random();
private static final int NUM_THREADS = 2;
/**
* A random string used as an ID for this instance of Flyway.
*/
private final String tableLockString = getNextRandomString();
private final JdbcTemplate jdbcTemplate;
public static final int LOCK_TIMEOUT_MINS = 10;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> scheduledFuture;
/**
* Description field value used in the lock row to identify it as a Flyway lock.
*/
public static String FLYWAY_LOCK_STRING = "flyway-lock";
public InsertRowLock(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.executor = createScheduledExecutor();
}
/**
* Gets the unique lock identifier for this Flyway instance.
*
* @return the unique lock identifier stored in the lock row's 'version' column
*/
public String getLockId() {
return this.tableLockString;
}
/**
* Acquires a lock on the schema history table using primary key constraint.
*
* <p>This method assumes the database supports primary key constraints on 'installed_rank'.
* The lock is acquired by inserting a row with installed_rank = -100. If another instance
* already holds the lock, this method will retry every second until the lock becomes available.</p>
*
* @param insertStatementTemplate template for INSERT statement with placeholders ({@link org.flywaydb.core.internal.database.base.Database#getInsertStatement(Table)})
* @param updateLockStatement SQL statement to update the lock timestamp, takes a single '?' placeholder for the 'version' column that will be replaced with {@link #tableLockString}
* @param deleteExpiredLockStatement SQL statement to delete expired locks, takes a single '?' placeholder that will be replaced with a timestamp to remove locks older than the cutoff
* @param booleanTrue database-specific representation of boolean true value
* @throws SQLException if a database error occurs
*/
public void doLock(String insertStatementTemplate, String updateLockStatement, String deleteExpiredLockStatement, String booleanTrue) throws SQLException {
doLock(insertStatementTemplate, updateLockStatement, deleteExpiredLockStatement, "0", booleanTrue, (jdbcTemplate, insertStatement) -> {
// Insert the locking row - the primary key-ness of 'installed_rank' will prevent us having two
Results results = jdbcTemplate.executeStatement(insertStatement);
// Succeed if there were no errors
return results.getException() == null;
});
}
/**
* Acquires a lock on the schema history table using a custom locking strategy.
*
* <p>This is the main locking method that supports different database-specific locking strategies
* through the lockStrategy predicate. The method will:</p>
* <ol>
* <li>Delete any expired locks from previous instances</li>
* <li>Attempt to insert a lock row using the provided strategy</li>
* <li>If successful, start a background thread to keep the lock alive</li>
* <li>If unsuccessful, retry every second (up to 50 retries with debug logging, then error logging)</li>
* </ol>
*
* @param insertStatementTemplate template for INSERT statement with placeholders ({@link org.flywaydb.core.internal.database.base.Database#getInsertStatement(Table)})
* @param updateLockStatement SQL statement to update the lock timestamp, takes a single '?' placeholder for the 'version' column that will be replaced with {@link #tableLockString}
* @param deleteExpiredLockStatement SQL statement to delete expired locks, takes a single '?' placeholder that will be replaced with a timestamp to remove locks older than the cutoff
* @param checksumValue checksum value to use in the lock row
* @param booleanTrue database-specific representation of boolean true value
* @param lockStrategy predicate that attempts to acquire the lock and returns true on success. Takes (JdbcTemplate, String insertStatement) where the statement is the formatted INSERT for the lock row
* @throws SQLException if a database error occurs
*/
public void doLock(String insertStatementTemplate,
String updateLockStatement,
String deleteExpiredLockStatement,
String checksumValue,
String booleanTrue,
BiPredicate<JdbcTemplate, String> lockStrategy) throws SQLException {
int retryCount = 0;
while (true) {
try {
jdbcTemplate.execute(generateDeleteExpiredLockStatement(deleteExpiredLockStatement));
if (insertLockingRow(insertStatementTemplate, checksumValue, booleanTrue, lockStrategy)) {
scheduledFuture = startLockWatchingThread(String.format(updateLockStatement.replace("?", "%s"), tableLockString));
return;
}
if (retryCount < 50) {
retryCount++;
LOG.debug("Waiting for lock on Flyway schema history table");
} else {
LOG.error("Waiting for lock on Flyway schema history table. Application may be deadlocked. Lock row may require manual removal " +
"from the schema history table.");
}
Thread.sleep(1000);
} catch (InterruptedException ex) {
// Ignore - if interrupted, we still need to wait for lock to become available
}
}
}
private String generateDeleteExpiredLockStatement(String deleteExpiredLockStatementTemplate) {
LocalDateTime zonedDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(LOCK_TIMEOUT_MINS);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
return String.format(deleteExpiredLockStatementTemplate.replace("?", "%s"), zonedDateTime.format(formatter));
}
private boolean insertLockingRow(String insertStatementTemplate, String checksumValue, String booleanTrue, BiPredicate<JdbcTemplate, String> lockStrategy) {
String insertStatement = String.format(insertStatementTemplate.replace("?", "%s"),
-100,
"'" + tableLockString + "'",
"'" + FLYWAY_LOCK_STRING + "'",
"''",
"''",
checksumValue,
"''",
0,
booleanTrue
);
return lockStrategy.test(jdbcTemplate, insertStatement);
}
public void doUnlock(String deleteLockTemplate) throws SQLException {
stopLockWatchingThread();
String deleteLockStatement = String.format(deleteLockTemplate.replace("?", "%s"), tableLockString);
jdbcTemplate.execute(deleteLockStatement);
}
private String getNextRandomString() {
return new BigInteger(128, random).toString(16);
}
private ScheduledExecutorService createScheduledExecutor() {
return Executors.newScheduledThreadPool(NUM_THREADS, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});
}
private ScheduledFuture<?> startLockWatchingThread(String updateLockStatement) {
Runnable lockUpdatingTask = () -> {
LOG.debug("Updating lock in Flyway schema history table");
jdbcTemplate.executeStatement(updateLockStatement);
};
return executor.scheduleAtFixedRate(lockUpdatingTask, 0, LOCK_TIMEOUT_MINS / 2, TimeUnit.MINUTES);
}
private void stopLockWatchingThread() {
scheduledFuture.cancel(true);
}
}