InsertRowLock.java
/*-
* ========================LICENSE_START=================================
* flyway-core
* ========================================================================
* Copyright (C) 2010 - 2025 Red Gate Software Ltd
* ========================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/
package org.flywaydb.core.internal.database;
import lombok.CustomLog;
import org.flywaydb.core.internal.jdbc.JdbcTemplate;
import org.flywaydb.core.internal.jdbc.Results;
import java.math.BigInteger;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@CustomLog
public class InsertRowLock {
private static final Random random = new Random();
private static final int NUM_THREADS = 2;
/**
* A random string used as an ID for this instance of Flyway.
*/
private final String tableLockString = getNextRandomString();
private final JdbcTemplate jdbcTemplate;
public static final int LOCK_TIMEOUT_MINS = 10;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> scheduledFuture;
public InsertRowLock(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.executor = createScheduledExecutor();
}
public void doLock(String insertStatementTemplate, String updateLockStatement, String deleteExpiredLockStatement, String booleanTrue) throws SQLException {
int retryCount = 0;
while (true) {
try {
jdbcTemplate.execute(generateDeleteExpiredLockStatement(deleteExpiredLockStatement));
if (insertLockingRow(insertStatementTemplate, booleanTrue)) {
scheduledFuture = startLockWatchingThread(String.format(updateLockStatement.replace("?", "%s"), tableLockString));
return;
}
if (retryCount < 50) {
retryCount++;
LOG.debug("Waiting for lock on Flyway schema history table");
} else {
LOG.error("Waiting for lock on Flyway schema history table. Application may be deadlocked. Lock row may require manual removal " +
"from the schema history table.");
}
Thread.sleep(1000);
} catch (InterruptedException ex) {
// Ignore - if interrupted, we still need to wait for lock to become available
}
}
}
private String generateDeleteExpiredLockStatement(String deleteExpiredLockStatementTemplate) {
LocalDateTime zonedDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(LOCK_TIMEOUT_MINS);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
return String.format(deleteExpiredLockStatementTemplate.replace("?", "%s"), zonedDateTime.format(formatter));
}
private boolean insertLockingRow(String insertStatementTemplate, String booleanTrue) {
String insertStatement = String.format(insertStatementTemplate.replace("?", "%s"),
-100,
"'" + tableLockString + "'",
"'flyway-lock'",
"''",
"''",
0,
"''",
0,
booleanTrue
);
// Insert the locking row - the primary key-ness of 'installed_rank' will prevent us having two
Results results = jdbcTemplate.executeStatement(insertStatement);
// Succeed if there were no errors
return results.getException() == null;
}
public void doUnlock(String deleteLockTemplate) throws SQLException {
stopLockWatchingThread();
String deleteLockStatement = String.format(deleteLockTemplate.replace("?", "%s"), tableLockString);
jdbcTemplate.execute(deleteLockStatement);
}
private String getNextRandomString() {
return new BigInteger(128, random).toString(16);
}
private ScheduledExecutorService createScheduledExecutor() {
return Executors.newScheduledThreadPool(NUM_THREADS, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});
}
private ScheduledFuture<?> startLockWatchingThread(String updateLockStatement) {
Runnable lockUpdatingTask = () -> {
LOG.debug("Updating lock in Flyway schema history table");
jdbcTemplate.executeStatement(updateLockStatement);
};
return executor.scheduleAtFixedRate(lockUpdatingTask, 0, LOCK_TIMEOUT_MINS / 2, TimeUnit.MINUTES);
}
private void stopLockWatchingThread() {
scheduledFuture.cancel(true);
}
}