TestHistoryFileManager.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileNotFoundException;
import java.util.UUID;
import java.util.List;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestHistoryFileManager {
private static MiniDFSCluster dfsCluster = null;
private static MiniDFSCluster dfsCluster2 = null;
private static String coreSitePath;
@BeforeAll
public static void setUpClass() throws Exception {
coreSitePath = "." + File.separator + "target" + File.separator +
"test-classes" + File.separator + "core-site.xml";
Configuration conf = new HdfsConfiguration();
Configuration conf2 = new HdfsConfiguration();
dfsCluster = new MiniDFSCluster.Builder(conf).build();
conf2.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, conf.get(
MiniDFSCluster.HDFS_MINIDFS_BASEDIR, MiniDFSCluster.getBaseDirectory())
+ "_2");
dfsCluster2 = new MiniDFSCluster.Builder(conf2).build();
}
@AfterAll
public static void cleanUpClass() throws Exception {
dfsCluster.shutdown();
dfsCluster2.shutdown();
}
@AfterEach
public void cleanTest() throws Exception {
new File(coreSitePath).delete();
dfsCluster.getFileSystem().setSafeMode(
SafeModeAction.LEAVE);
dfsCluster2.getFileSystem().setSafeMode(
SafeModeAction.LEAVE);
}
private String getDoneDirNameForTest(String name) {
return "/" + name;
}
private String getIntermediateDoneDirNameForTest(String name) {
return "/intermediate_" + name;
}
private void testTryCreateHistoryDirs(Configuration conf, boolean expected, String methodName)
throws Exception {
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, getDoneDirNameForTest(methodName));
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
getIntermediateDoneDirNameForTest(methodName));
HistoryFileManager hfm = new HistoryFileManager();
hfm.conf = conf;
assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
}
@Test
public void testCreateDirsWithoutFileSystem(TestInfo testInfo) throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:1");
testTryCreateHistoryDirs(conf, false, testInfo.getDisplayName());
}
@Test
public void testCreateDirsWithFileSystem(TestInfo testInfo) throws Exception {
dfsCluster.getFileSystem().setSafeMode(
SafeModeAction.LEAVE);
assertFalse(dfsCluster.getFileSystem().isInSafeMode());
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true,
testInfo.getDisplayName());
}
@Test
public void testCreateDirsWithAdditionalFileSystem(TestInfo testInfo) throws Exception {
dfsCluster.getFileSystem().setSafeMode(
SafeModeAction.LEAVE);
dfsCluster2.getFileSystem().setSafeMode(
SafeModeAction.LEAVE);
assertFalse(dfsCluster.getFileSystem().isInSafeMode());
assertFalse(dfsCluster2.getFileSystem().isInSafeMode());
// Set default configuration to the first cluster
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
FileOutputStream os = new FileOutputStream(coreSitePath);
conf.writeXml(os);
os.close();
testTryCreateHistoryDirs(dfsCluster2.getConfiguration(0), true,
testInfo.getDisplayName());
// Directories should be created only in the default file system (dfsCluster)
String displayName = testInfo.getDisplayName();
assertTrue(dfsCluster.getFileSystem().
exists(new Path(getDoneDirNameForTest(displayName))));
assertTrue(dfsCluster.getFileSystem().
exists(new Path(getIntermediateDoneDirNameForTest(displayName))));
assertFalse(dfsCluster2.getFileSystem().
exists(new Path(getDoneDirNameForTest(displayName))));
assertFalse(dfsCluster2.getFileSystem().
exists(new Path(getIntermediateDoneDirNameForTest(displayName))));
}
@Test
public void testCreateDirsWithFileSystemInSafeMode(TestInfo testInfo) throws Exception {
dfsCluster.getFileSystem().setSafeMode(
SafeModeAction.ENTER);
assertTrue(dfsCluster.getFileSystem().isInSafeMode());
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), false,
testInfo.getDisplayName());
}
private void testCreateHistoryDirs(Configuration conf, Clock clock)
throws Exception {
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
HistoryFileManager hfm = new HistoryFileManager();
hfm.conf = conf;
hfm.createHistoryDirs(clock, 500, 2000);
}
@Test
public void testCreateDirsWithFileSystemBecomingAvailBeforeTimeout()
throws Exception {
dfsCluster.getFileSystem().setSafeMode(
SafeModeAction.ENTER);
assertTrue(dfsCluster.getFileSystem().isInSafeMode());
new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
dfsCluster.getFileSystem().setSafeMode(
SafeModeAction.LEAVE);
assertTrue(dfsCluster.getFileSystem().isInSafeMode());
} catch (Exception ex) {
fail(ex.toString());
}
}
}.start();
testCreateHistoryDirs(dfsCluster.getConfiguration(0),
SystemClock.getInstance());
}
@Test
public void testCreateDirsWithFileSystemNotBecomingAvailBeforeTimeout()
throws Exception {
dfsCluster.getFileSystem().setSafeMode(
SafeModeAction.ENTER);
assertTrue(dfsCluster.getFileSystem().isInSafeMode());
final ControlledClock clock = new ControlledClock();
clock.setTime(1);
new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
clock.setTime(3000);
} catch (Exception ex) {
fail(ex.toString());
}
}
}.start();
assertThrows(YarnRuntimeException.class, () -> {
testCreateHistoryDirs(dfsCluster.getConfiguration(0), clock);
});
}
@Test
public void testScanDirectory() throws Exception {
Path p = new Path("any");
FileContext fc = mock(FileContext.class);
when(fc.makeQualified(p)).thenReturn(p);
when(fc.listStatus(p)).thenThrow(new FileNotFoundException());
List<FileStatus> lfs = HistoryFileManager.scanDirectory(p, fc, null);
//primarily, succcess is that an exception was not thrown. Also nice to
//check this
assertNotNull(lfs);
}
@Test
public void testHistoryFileInfoSummaryFileNotExist() throws Exception {
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
String job = "job_1410889000000_123456";
Path summaryFile = new Path(job + ".summary");
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(job)));
Configuration conf = dfsCluster.getConfiguration(0);
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR,
"/" + UUID.randomUUID());
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
"/" + UUID.randomUUID());
hmTest.serviceInit(conf);
HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null,
summaryFile, jobIndexInfo, false);
info.moveToDone();
assertFalse(info.didMoveFail());
}
@Test
public void testHistoryFileInfoLoadOversizedJobShouldReturnUnParsedJob()
throws Exception {
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
int allowedMaximumTasks = 5;
Configuration conf = dfsCluster.getConfiguration(0);
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, allowedMaximumTasks);
hmTest.init(conf);
// set up a job of which the number of tasks is greater than maximum allowed
String jobId = "job_1410889000000_123456";
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
jobIndexInfo.setNumMaps(allowedMaximumTasks);
jobIndexInfo.setNumReduces(allowedMaximumTasks);
HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, null,
jobIndexInfo, false);
Job job = info.loadJob();
assertTrue(job instanceof UnparsedJob, "Should return an instance of UnparsedJob to indicate" +
" the job history file is not parsed");
}
@Test
public void testHistoryFileInfoLoadNormalSizedJobShouldReturnCompletedJob()
throws Exception {
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
final int numOfTasks = 100;
Configuration conf = dfsCluster.getConfiguration(0);
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
numOfTasks + numOfTasks + 1);
hmTest.init(conf);
// set up a job of which the number of tasks is smaller than the maximum
// allowed, and therefore will be fully loaded.
final String jobId = "job_1416424547277_0002";
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
jobIndexInfo.setNumMaps(numOfTasks);
jobIndexInfo.setNumReduces(numOfTasks);
final String historyFile = getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile();
final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
new Path(historyFile));
HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
null, jobIndexInfo, false);
Job job = info.loadJob();
assertTrue(job instanceof CompletedJob, "Should return an instance of CompletedJob as " +
"a result of parsing the job history file of the job");
}
@Test
public void testHistoryFileInfoShouldReturnCompletedJobIfMaxNotConfiged()
throws Exception {
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
Configuration conf = dfsCluster.getConfiguration(0);
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
hmTest.init(conf);
final String jobId = "job_1416424547277_0002";
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
jobIndexInfo.setNumMaps(100);
jobIndexInfo.setNumReduces(100);
final String historyFile = getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile();
final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
new Path(historyFile));
HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
null, jobIndexInfo, false);
Job job = info.loadJob();
assertTrue(job instanceof CompletedJob, "Should return an instance of CompletedJob as " +
"a result of parsing the job history file of the job");
}
/**
* This test sets up a scenario where the history files have already been
* moved to the "done" directory (so the "intermediate" directory is empty),
* but then moveToDone() is called again on the same history file. It
* validates that the second moveToDone() still succeeds rather than throws a
* FileNotFoundException.
*/
@Test
public void testMoveToDoneAlreadyMovedSucceeds() throws Exception {
HistoryFileManagerTest historyFileManager = new HistoryFileManagerTest();
long jobTimestamp = 1535436603000L;
String job = "job_" + jobTimestamp + "_123456789";
String intermediateDirectory = "/" + UUID.randomUUID();
String doneDirectory = "/" + UUID.randomUUID();
Configuration conf = dfsCluster.getConfiguration(0);
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
intermediateDirectory);
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, doneDirectory);
Path intermediateHistoryFilePath = new Path(intermediateDirectory + "/"
+ job + ".jhist");
Path intermediateConfFilePath = new Path(intermediateDirectory + "/"
+ job + "_conf.xml");
Path doneHistoryFilePath = new Path(doneDirectory + "/"
+ JobHistoryUtils.timestampDirectoryComponent(jobTimestamp) + "/123456/"
+ job + ".jhist");
Path doneConfFilePath = new Path(doneDirectory + "/"
+ JobHistoryUtils.timestampDirectoryComponent(jobTimestamp)
+ "/123456/" + job + "_conf.xml");
dfsCluster.getFileSystem().createNewFile(doneHistoryFilePath);
dfsCluster.getFileSystem().createNewFile(doneConfFilePath);
historyFileManager.serviceInit(conf);
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(job)));
jobIndexInfo.setFinishTime(jobTimestamp);
HistoryFileInfo info = historyFileManager.getHistoryFileInfo(
intermediateHistoryFilePath, intermediateConfFilePath, null,
jobIndexInfo, false);
info.moveToDone();
assertFalse(info.isMovePending());
assertEquals(doneHistoryFilePath.toString(),
info.getHistoryFile().toUri().getPath());
assertEquals(doneConfFilePath.toString(),
info.getConfFile().toUri().getPath());
}
static class HistoryFileManagerTest extends HistoryFileManager {
public HistoryFileManagerTest() {
super();
}
public HistoryFileInfo getHistoryFileInfo(Path historyFile,
Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo,
boolean isInDone) {
return new HistoryFileInfo(historyFile, confFile, summaryFile,
jobIndexInfo, isInDone);
}
}
}