TestBalanceProcedureScheduler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
/**
* Test BalanceProcedureScheduler.
*/
public class TestBalanceProcedureScheduler {
private static MiniDFSCluster cluster;
private static final Configuration CONF = new Configuration();
private static DistributedFileSystem fs;
private static final int DEFAULT_BLOCK_SIZE = 512;
@BeforeAll
public static void setup() throws IOException {
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///");
CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true);
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
CONF.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
CONF.setInt(WORK_THREAD_NUM, 1);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
cluster.waitClusterUp();
cluster.waitActive();
fs = cluster.getFileSystem();
String workPath =
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
CONF.set(SCHEDULER_JOURNAL_URI, workPath);
fs.mkdirs(new Path(workPath));
}
@AfterAll
public static void close() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Test the scheduler could be shutdown correctly.
*/
@Test
@Timeout(value = 60)
public void testShutdownScheduler() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
// construct job
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(new WaitProcedure("wait", 1000, 5 * 1000));
BalanceJob job = builder.build();
scheduler.submit(job);
Thread.sleep(1000); // wait job to be scheduled.
scheduler.shutDownAndWait(30 * 1000);
BalanceJournal journal =
ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF);
journal.clear(job);
}
/**
* Test a successful job.
*/
@Test
@Timeout(value = 60)
public void testSuccessfulJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// construct job
List<RecordProcedure> procedures = new ArrayList<>();
BalanceJob.Builder builder = new BalanceJob.Builder<RecordProcedure>();
for (int i = 0; i < 5; i++) {
RecordProcedure r = new RecordProcedure("record-" + i, 1000L);
builder.nextProcedure(r);
procedures.add(r);
}
BalanceJob<RecordProcedure> job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertNull(job.getError());
// verify finish list.
assertEquals(5, RecordProcedure.getFinishList().size());
for (int i = 0; i < RecordProcedure.getFinishList().size(); i++) {
assertEquals(procedures.get(i), RecordProcedure.getFinishList().get(i));
}
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test a job fails and the error can be got.
*/
@Test
@Timeout(value = 60)
public void testFailedJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// Mock bad procedure.
BalanceProcedure badProcedure = mock(BalanceProcedure.class);
doThrow(new IOException("Job failed exception."))
.when(badProcedure).execute();
doReturn("bad-procedure").when(badProcedure).name();
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(badProcedure);
BalanceJob job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
GenericTestUtils
.assertExceptionContains("Job failed exception", job.getError());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test recover a job. After the job is recovered, the job should start from
* the last unfinished procedure, which is the first procedure without
* journal.
*/
@Test
@Timeout(value = 60)
public void testGetJobAfterRecover() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// Construct job.
BalanceJob.Builder builder = new BalanceJob.Builder<>();
String firstProcedure = "wait0";
WaitProcedure[] procedures = new WaitProcedure[5];
for (int i = 0; i < 5; i++) {
WaitProcedure procedure = new WaitProcedure("wait" + i, 1000, 1000);
builder.nextProcedure(procedure).removeAfterDone(false);
procedures[i] = procedure;
}
BalanceJob job = builder.build();
scheduler.submit(job);
// Sleep a random time then shut down.
long randomSleepTime = Math.abs(new Random().nextInt()) % 5 * 1000 + 1000;
Thread.sleep(randomSleepTime);
scheduler.shutDownAndWait(2);
// Current procedure is the last unfinished procedure. It is also the
// first procedure without journal.
WaitProcedure recoverProcedure = (WaitProcedure) job.getCurProcedure();
int recoverIndex = -1;
for (int i = 0; i < procedures.length; i++) {
if (procedures[i].name().equals(recoverProcedure.name())) {
recoverIndex = i;
break;
}
}
// Restart scheduler and recover the job.
scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
scheduler.waitUntilDone(job);
// The job should be done successfully and the recoverJob should be equal
// to the original job.
BalanceJob recoverJob = scheduler.findJob(job);
assertNull(recoverJob.getError());
assertNotSame(job, recoverJob);
assertEquals(job, recoverJob);
// Verify whether the recovered job starts from the recoverProcedure.
Map<String, WaitProcedure> pTable = recoverJob.getProcedureTable();
List<WaitProcedure> recoveredProcedures =
procedureTableToList(pTable, firstProcedure);
for (int i = 0; i < recoverIndex; i++) {
// All procedures before recoverProcedure shouldn't be executed.
assertFalse(recoveredProcedures.get(i).getExecuted());
}
for (int i = recoverIndex; i < procedures.length; i++) {
// All procedures start from recoverProcedure should be executed.
assertTrue(recoveredProcedures.get(i).getExecuted());
}
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test RetryException is handled correctly.
*/
@Test
@Timeout(value = 60)
public void testRetry() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// construct job
BalanceJob.Builder builder = new BalanceJob.Builder<>();
RetryProcedure retryProcedure = new RetryProcedure("retry", 1000, 3);
builder.nextProcedure(retryProcedure);
BalanceJob job = builder.build();
long start = Time.monotonicNow();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertNull(job.getError());
long duration = Time.monotonicNow() - start;
assertEquals(true, duration > 1000 * 3);
assertEquals(3, retryProcedure.getTotalRetry());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test schedule an empty job.
*/
@Test
@Timeout(value = 60)
public void testEmptyJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
BalanceJob job = new BalanceJob.Builder<>().build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test serialization and deserialization of Job.
*/
@Test
@Timeout(value = 60)
public void testJobSerializeAndDeserialize() throws Exception {
BalanceJob.Builder builder = new BalanceJob.Builder<RecordProcedure>();
for (int i = 0; i < 5; i++) {
RecordProcedure r = new RecordProcedure("record-" + i, 1000L);
builder.nextProcedure(r);
}
builder.nextProcedure(new RetryProcedure("retry", 1000, 3));
BalanceJob<RecordProcedure> job = builder.build();
job.setId(BalanceProcedureScheduler.allocateJobId());
// Serialize.
ByteArrayOutputStream bao = new ByteArrayOutputStream();
job.write(new DataOutputStream(bao));
bao.flush();
ByteArrayInputStream bai = new ByteArrayInputStream(bao.toByteArray());
// Deserialize.
BalanceJob newJob = new BalanceJob.Builder<>().build();
newJob.readFields(new DataInputStream(bai));
assertEquals(job, newJob);
}
/**
* Test scheduler crashes and recovers.
*/
@Test
@Timeout(value = 180)
public void testSchedulerDownAndRecoverJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
Path parent = new Path("/testSchedulerDownAndRecoverJob");
try {
// construct job
BalanceJob.Builder builder = new BalanceJob.Builder<>();
MultiPhaseProcedure multiPhaseProcedure =
new MultiPhaseProcedure("retry", 1000, 10, CONF, parent.toString());
builder.nextProcedure(multiPhaseProcedure);
BalanceJob job = builder.build();
scheduler.submit(job);
Thread.sleep(500); // wait procedure to be scheduled.
scheduler.shutDownAndWait(2);
assertFalse(job.isJobDone());
int len = fs.listStatus(parent).length;
assertTrue(len > 0 && len < 10);
// restart scheduler, test recovering the job.
scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
scheduler.waitUntilDone(job);
assertEquals(10, fs.listStatus(parent).length);
for (int i = 0; i < 10; i++) {
assertTrue(fs.exists(new Path(parent, "phase-" + i)));
}
BalanceJob recoverJob = scheduler.findJob(job);
assertNull(recoverJob.getError());
assertNotSame(job, recoverJob);
assertEquals(job, recoverJob);
} finally {
if (fs.exists(parent)) {
fs.delete(parent, true);
}
scheduler.shutDownAndWait(2);
}
}
@Test
@Timeout(value = 60)
public void testRecoverJobFromJournal() throws Exception {
BalanceJournal journal =
ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF);
BalanceJob.Builder builder = new BalanceJob.Builder<RecordProcedure>();
BalanceProcedure wait0 = new WaitProcedure("wait0", 1000, 5000);
BalanceProcedure wait1 = new WaitProcedure("wait1", 1000, 1000);
builder.nextProcedure(wait0).nextProcedure(wait1);
BalanceJob job = builder.build();
job.setId(BalanceProcedureScheduler.allocateJobId());
job.setCurrentProcedure(wait1);
job.setLastProcedure(null);
journal.saveJob(job);
long start = Time.monotonicNow();
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
scheduler.waitUntilDone(job);
long duration = Time.monotonicNow() - start;
assertTrue(duration >= 1000 && duration < 5000);
} finally {
scheduler.shutDownAndWait(2);
}
}
@Test
@Timeout(value = 60)
public void testClearJournalFail() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
BalanceJournal journal = mock(BalanceJournal.class);
AtomicInteger count = new AtomicInteger(0);
doAnswer(invocation -> {
if (count.incrementAndGet() == 1) {
throw new IOException("Mock clear failure");
}
return null;
}).when(journal).clear(any(BalanceJob.class));
scheduler.setJournal(journal);
try {
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(new WaitProcedure("wait", 1000, 1000));
BalanceJob job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertEquals(2, count.get());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test the job will be recovered if writing journal fails.
*/
@Test
@Timeout(value = 60)
public void testJobRecoveryWhenWriteJournalFail() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// construct job
AtomicBoolean recoverFlag = new AtomicBoolean(true);
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(new WaitProcedure("wait", 1000, 1000))
.nextProcedure(
new UnrecoverableProcedure("shutdown", 1000, () -> {
cluster.restartNameNode(false);
return true;
})).nextProcedure(
new UnrecoverableProcedure("recoverFlag", 1000, () -> {
recoverFlag.set(false);
return true;
})).nextProcedure(new WaitProcedure("wait", 1000, 1000));
BalanceJob job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertTrue(job.isJobDone());
assertNull(job.getError());
assertTrue(recoverFlag.get());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Transform the procedure map into an ordered list based on the relations
* specified by the map.
*/
<T extends BalanceProcedure> List<T> procedureTableToList(
Map<String, T> pTable, String first) {
List<T> procedures = new ArrayList<>();
T cur = pTable.get(first);
while (cur != null) {
procedures.add(cur);
cur = pTable.get(cur.nextProcedure());
}
return procedures;
}
}