TestJobHistoryParsing.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.RackResolver;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestJobHistoryParsing {
private static final Logger LOG =
LoggerFactory.getLogger(TestJobHistoryParsing.class);
private static final String RACK_NAME = "/MyRackName";
private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return Arrays.asList(new String[] { RACK_NAME });
}
@Override
public void reloadCachedMappings() {
}
@Override
public void reloadCachedMappings(List<String> names) {
}
}
@Test
@Timeout(value = 50)
public void testJobInfo() throws Exception {
JobInfo info = new JobInfo();
assertEquals("NORMAL", info.getPriority());
info.printAll();
}
@Test
@Timeout(value = 300)
public void testHistoryParsing() throws Exception {
LOG.info("STARTING testHistoryParsing()");
try {
checkHistoryParsing(2, 1, 2);
} finally {
LOG.info("FINISHED testHistoryParsing()");
}
}
@Test
@Timeout(value = 50)
public void testHistoryParsingWithParseErrors() throws Exception {
LOG.info("STARTING testHistoryParsingWithParseErrors()");
try {
checkHistoryParsing(3, 0, 2);
} finally {
LOG.info("FINISHED testHistoryParsingWithParseErrors()");
}
}
private static String getJobSummary(FileContext fc, Path path)
throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
private void checkHistoryParsing(final int numMaps, final int numReduces,
final int numSuccessfulMaps) throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
long amStartTimeEst = System.currentTimeMillis();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
.getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
} catch (IOException ioe) {
LOG.info("Can not get FileContext", ioe);
throw (new Exception("Can not get File Context"));
}
if (numMaps == numSuccessfulMaps) {
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName);
String jobSummaryString = getJobSummary(fc, summaryFile);
assertNotNull(jobSummaryString);
assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Map<String, String> jobSummaryElements = new HashMap<String, String>();
StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
while (strToken.hasMoreTokens()) {
String keypair = strToken.nextToken();
jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
}
assertEquals(jobId.toString(), jobSummaryElements.get("jobId"),
"JobId does not match");
assertEquals("test", jobSummaryElements.get("jobName"), "JobName does not match");
assertTrue(Long.parseLong(jobSummaryElements.get("submitTime")) != 0,
"submitTime should not be 0");
assertTrue(Long.parseLong(jobSummaryElements.get("launchTime")) != 0,
"launchTime should not be 0");
assertTrue(Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0,
"firstMapTaskLaunchTime should not be 0");
assertTrue(Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0,
"firstReduceTaskLaunchTime should not be 0");
assertTrue(Long.parseLong(jobSummaryElements.get("finishTime")) != 0,
"finishTime should not be 0");
assertEquals(numSuccessfulMaps, Integer.parseInt(jobSummaryElements.get("numMaps")),
"Mismatch in num map slots");
assertEquals(numReduces, Integer.parseInt(jobSummaryElements.get("numReduces")),
"Mismatch in num reduce slots");
assertEquals(System.getProperty("user.name"), jobSummaryElements.get("user"),
"User does not match");
assertEquals("default", jobSummaryElements.get("queue"), "Queue does not match");
assertEquals("SUCCEEDED", jobSummaryElements.get("status"), "Status does not match");
}
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobInfo jobInfo;
long numFinishedMaps;
synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
LOG.info("JobHistoryFile is: " + historyFilePath);
try {
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
JobHistoryParser parser = new JobHistoryParser(in);
final EventReader realReader = new EventReader(in);
EventReader reader = Mockito.mock(EventReader.class);
if (numMaps == numSuccessfulMaps) {
reader = realReader;
} else {
final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
HistoryEvent event = realReader.getNextEvent();
if (event instanceof TaskFinishedEvent) {
numFinishedEvents.incrementAndGet();
}
if (numFinishedEvents.get() <= numSuccessfulMaps) {
return event;
} else {
throw new IOException("test");
}
}
});
}
jobInfo = parser.parse(reader);
numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
if (numFinishedMaps != numMaps) {
Exception parseException = parser.getParseException();
assertNotNull(parseException, "Didn't get expected parse exception");
}
}
assertEquals(System.getProperty("user.name"),
jobInfo.getUsername(), "Incorrect username ");
assertEquals("test", jobInfo.getJobname(), "Incorrect jobName");
assertEquals("default", jobInfo.getJobQueueName(),
"Incorrect queuename");
assertEquals("test", jobInfo.getJobConfPath(), "incorrect conf path");
assertEquals(numSuccessfulMaps,
numFinishedMaps, "incorrect finishedMap ");
assertEquals(numReduces,
jobInfo.getSucceededReduces(), "incorrect finishedReduces ");
assertEquals(job.isUber(),
jobInfo.getUberized(), "incorrect uberized ");
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
int totalTasks = allTasks.size();
assertEquals((numMaps + numReduces), totalTasks,
"total number of tasks is incorrect");
// Verify aminfo
assertEquals(1, jobInfo.getAMInfos().size());
assertEquals(MRApp.NM_HOST, jobInfo.getAMInfos().get(0)
.getNodeManagerHost());
AMInfo amInfo = jobInfo.getAMInfos().get(0);
assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
assertTrue(amInfo.getStartTime() <= System.currentTimeMillis()
&& amInfo.getStartTime() >= amStartTimeEst);
ContainerId fakeCid = MRApp.newContainerId(-1, -1, -1, -1);
// Assert at taskAttempt level
for (TaskInfo taskInfo : allTasks.values()) {
int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
assertEquals(1, taskAttemptCount, "total number of task attempts");
TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values()
.iterator().next();
assertNotNull(taInfo.getContainerId());
// Verify the wrong ctor is not being used. Remove after mrv1 is removed.
assertNotEquals(taInfo.getContainerId(), fakeCid);
}
// Deep compare Job and JobInfo
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
assertNotNull(taskInfo, "TaskInfo not found");
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID())));
assertNotNull(taskAttemptInfo, "TaskAttemptInfo not found");
assertEquals(taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort(),
"Incorrect shuffle port for task attempt");
if (numMaps == numSuccessfulMaps) {
assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
// Verify rack-name
assertEquals(taskAttemptInfo.getRackname(), RACK_NAME,
"rack-name is incorrect");
}
}
}
// test output for HistoryViewer
PrintStream stdps = System.out;
try {
System.setOut(new PrintStream(outContent));
HistoryViewer viewer;
synchronized (fileInfo) {
viewer = new HistoryViewer(fc.makeQualified(
fileInfo.getHistoryFile()).toString(), conf, true);
}
viewer.print();
for (TaskInfo taskInfo : allTasks.values()) {
String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo
.getTaskStatus())
+ " "
+ taskInfo.getTaskType()
+ " task list for " + taskInfo.getTaskId().getJobID();
assertTrue(outContent.toString().indexOf(test) > 0);
assertTrue(outContent.toString().indexOf(
taskInfo.getTaskId().toString()) > 0);
}
} finally {
System.setOut(stdps);
}
}
// Computes finished maps similar to RecoveryService...
private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
int numSuccessfulMaps) {
if (numMaps == numSuccessfulMaps) {
return jobInfo.getSucceededMaps();
}
long numFinishedMaps = 0;
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
++numFinishedMaps;
}
}
return numFinishedMaps;
}
@Test
@Timeout(value = 30)
public void testHistoryParsingForFailedAttempts() throws Exception {
LOG.info("STARTING testHistoryParsingForFailedAttempts");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobHistoryParser parser;
JobInfo jobInfo;
synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
parser = new JobHistoryParser(in);
jobInfo = parser.parse();
}
Exception parseException = parser.getParseException();
assertNull(parseException,
"Caught an expected exception " + parseException);
int noOffailedAttempts = 0;
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID())));
// Verify rack-name for all task attempts
assertThat(taskAttemptInfo.getRackname())
.withFailMessage("rack-name is incorrect").isEqualTo(RACK_NAME);
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
noOffailedAttempts++;
}
}
}
assertEquals(2, noOffailedAttempts, "No of Failed tasks doesn't match.");
} finally {
LOG.info("FINISHED testHistoryParsingForFailedAttempts");
}
}
@Test
@Timeout(value = 30)
public void testHistoryParsingForKilledAndFailedAttempts() throws Exception {
MRApp app = null;
JobHistory jobHistory = null;
LOG.info("STARTING testHistoryParsingForKilledAndFailedAttempts");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json");
// "CommitterEventHandler" thread could be slower in some cases,
// which might cause a failed map/reduce task to fail the job
// immediately (see JobImpl.checkJobAfterTaskCompletion()). If there are
// killed events in progress, those will not be counted. Instead,
// we allow a 50% failure rate, so the job will always succeed and kill
// events will not be ignored.
conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);
conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 50);
RackResolver.init(conf);
app = new MRAppWithHistoryWithFailedAndKilledTask(3, 3, true, this
.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
jobHistory = new JobHistory();
jobHistory.init(conf);
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobHistoryParser parser;
JobInfo jobInfo;
synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
parser = new JobHistoryParser(in);
jobInfo = parser.parse();
}
Exception parseException = parser.getParseException();
assertNull(parseException,
"Caught an expected exception " + parseException);
assertEquals(1, jobInfo.getFailedMaps(), "FailedMaps");
assertEquals(1, jobInfo.getKilledMaps(), "KilledMaps");
assertEquals(1, jobInfo.getFailedReduces(), "FailedReduces");
assertEquals(1, jobInfo.getKilledReduces(), "KilledReduces");
} finally {
LOG.info("FINISHED testHistoryParsingForKilledAndFailedAttempts");
if (app != null) {
app.close();
}
if (jobHistory != null) {
jobHistory.close();
}
}
}
@Test
@Timeout(value = 60)
public void testCountersForFailedTask() throws Exception {
LOG.info("STARTING testCountersForFailedTask");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.FAILED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobHistoryParser parser;
JobInfo jobInfo;
synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
parser = new JobHistoryParser(in);
jobInfo = parser.parse();
}
Exception parseException = parser.getParseException();
assertNull(parseException, "Caught an expected exception " + parseException);
for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
assertNotNull(ct.getReport().getCounters(),
"completed task report has null counters");
}
final List<String> originalDiagnostics = job.getDiagnostics();
final String historyError = jobInfo.getErrorInfo();
assertTrue(originalDiagnostics != null && !originalDiagnostics.isEmpty(),
"No original diagnostics for a failed job");
assertNotNull(historyError, "No history error info for a failed job");
for (String diagString : originalDiagnostics) {
assertTrue(historyError.contains(diagString));
}
} finally {
LOG.info("FINISHED testCountersForFailedTask");
}
}
@Test
@Timeout(value = 60)
public void testDiagnosticsForKilledJob() throws Exception {
LOG.info("STARTING testDiagnosticsForKilledJob");
try {
final Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithJobKilled(2, 1, true, this
.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.KILLED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobHistoryParser parser;
JobInfo jobInfo;
synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
parser = new JobHistoryParser(in);
jobInfo = parser.parse();
}
Exception parseException = parser.getParseException();
assertNull(parseException, "Caught an expected exception " + parseException);
final List<String> originalDiagnostics = job.getDiagnostics();
final String historyError = jobInfo.getErrorInfo();
assertTrue(originalDiagnostics != null && !originalDiagnostics.isEmpty(),
"No original diagnostics for a failed job");
assertNotNull(historyError, "No history error info for a failed job ");
for (String diagString : originalDiagnostics) {
assertTrue(historyError.contains(diagString));
}
assertTrue(historyError.contains(JobImpl.JOB_KILLED_DIAG),
"No killed message in diagnostics");
} finally {
LOG.info("FINISHED testDiagnosticsForKilledJob");
}
}
@Test
@Timeout(value = 50)
public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
assertNotNull(fileInfo, "Unable to locate job history");
// force the manager to "forget" the job
hfm.deleteJobFromJobListCache(fileInfo);
final int msecPerSleep = 10;
int msecToSleep = 10 * 1000;
while (fileInfo.isMovePending() && msecToSleep > 0) {
assertFalse(fileInfo.didMoveFail());
msecToSleep -= msecPerSleep;
Thread.sleep(msecPerSleep);
}
assertTrue(msecToSleep > 0, "Timeout waiting for history move");
fileInfo = hfm.getFileInfo(jobId);
hfm.stop();
assertNotNull(fileInfo, "Unable to locate old job history");
assertTrue(hfm.moveToDoneExecutor.isTerminated(),
"HistoryFileManager not shutdown properly");
} finally {
LOG.info("FINISHED testScanningOldDirs");
}
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
public MRAppWithHistoryWithFailedTask(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
static class MRAppWithHistoryWithFailedAndKilledTask
extends MRAppWithHistory {
MRAppWithHistoryWithFailedAndKilledTask(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
final int taskId = attemptID.getTaskId().getId();
final TaskType taskType = attemptID.getTaskId().getTaskType();
// map #0 --> kill
// reduce #0 --> fail
if (taskType == TaskType.MAP && taskId == 0) {
getContext().getEventHandler().handle(
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
} else if (taskType == TaskType.MAP && taskId == 1) {
getContext().getEventHandler().handle(
new TaskAttemptFailEvent(attemptID));
} else if (taskType == TaskType.REDUCE && taskId == 0) {
getContext().getEventHandler().handle(
new TaskAttemptFailEvent(attemptID));
} else if (taskType == TaskType.REDUCE && taskId == 1) {
getContext().getEventHandler().handle(
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory {
public MRAppWithHistoryWithJobKilled(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0) {
getContext().getEventHandler().handle(
new JobEvent(attemptID.getTaskId().getJobId(),
JobEventType.JOB_KILL));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
static class HistoryFileManagerForTest extends HistoryFileManager {
void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
jobListCache.delete(fileInfo);
}
}
public static void main(String[] args) throws Exception {
TestJobHistoryParsing t = new TestJobHistoryParsing();
t.testHistoryParsing();
t.testHistoryParsingForFailedAttempts();
}
/**
* Test clean old history files. Files should be deleted after 1 week by
* default.
*/
@Test
@Timeout(value = 15)
public void testDeleteFileInfo() throws Exception {
LOG.info("STARTING testDeleteFileInfo");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManager hfm = new HistoryFileManager();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
hfm.initExisting();
// wait for move files form the done_intermediate directory to the gone
// directory
while (fileInfo.isMovePending()) {
Thread.sleep(300);
}
assertNotNull(hfm.jobListCache.values());
// try to remove fileInfo
hfm.clean();
// check that fileInfo does not deleted
assertFalse(fileInfo.isDeleted());
// correct live time
hfm.setMaxHistoryAge(-1);
hfm.clean();
hfm.stop();
assertTrue(hfm.moveToDoneExecutor.isTerminated(), "Thread pool shutdown");
// should be deleted !
assertTrue(fileInfo.isDeleted(), "file should be deleted ");
} finally {
LOG.info("FINISHED testDeleteFileInfo");
}
}
/**
* Simple test some methods of JobHistory
*/
@Test
@Timeout(value = 20)
public void testJobHistoryMethods() throws Exception {
LOG.info("STARTING testJobHistoryMethods");
try {
Configuration configuration = new Configuration();
configuration
.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(configuration);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(configuration);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure job history events are handled
app.waitForState(Service.STATE.STOPPED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(configuration);
// Method getAllJobs
assertEquals(1, jobHistory.getAllJobs().size());
// and with ApplicationId
assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size());
JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default",
0L, System.currentTimeMillis() + 1, 0L,
System.currentTimeMillis() + 1, JobState.SUCCEEDED);
assertEquals(1, jobsinfo.getJobs().size());
assertNotNull(jobHistory.getApplicationAttemptId());
// test Application Id
assertEquals("application_0_0000", jobHistory.getApplicationID().toString());
assertEquals("Job History Server", jobHistory.getApplicationName());
// method does not work
assertNull(jobHistory.getEventHandler());
// method does not work
assertNull(jobHistory.getClock());
// method does not work
assertNull(jobHistory.getClusterInfo());
} finally {
LOG.info("FINISHED testJobHistoryMethods");
}
}
/**
* Simple test PartialJob
*/
@Test
@Timeout(value = 3)
public void testPartialJob() throws Exception {
JobId jobId = new JobIdPBImpl();
jobId.setId(0);
JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user",
"jobName", jobId, 3, 2, "JobStatus");
PartialJob test = new PartialJob(jii, jobId);
assertEquals(1.0f, test.getProgress(), 0.001f);
assertNull(test.getAllCounters());
assertNull(test.getTasks());
assertNull(test.getTasks(TaskType.MAP));
assertNull(test.getTask(new TaskIdPBImpl()));
assertNull(test.getTaskAttemptCompletionEvents(0, 100));
assertNull(test.getMapAttemptCompletionEvents(0, 100));
assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null));
assertNull(test.getAMInfos());
}
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
0, 0, 0, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue(info.getErrorInfo().contains(tids[0].toString()),
"Task 0 not implicated");
}
@Test
public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
final Path histPath = new Path(getClass().getClassLoader().getResource(
"job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
.getFile());
final FileSystem lfs = FileSystem.getLocal(new Configuration());
final FSDataInputStream fsdis = lfs.open(histPath);
try {
JobHistoryParser parser = new JobHistoryParser(fsdis);
JobInfo info = parser.parse();
assertEquals(info.getJobId(), JobID.forName("job_1393307629410_0001"),
"History parsed jobId incorrectly");
assertEquals("", info.getErrorInfo(), "Default diagnostics incorrect");
} finally {
fsdis.close();
}
}
/**
* Test compatibility of JobHistoryParser with 2.0.3-alpha history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getSucceededMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
/**
* Test compatibility of JobHistoryParser with 2.4.0 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.4.0-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getSucceededMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() );
}
/**
* Test compatibility of JobHistoryParser with 0.23.9 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_0.23.9-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getSucceededMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
}