TestJobHistoryEventHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestJobHistoryEventHandler {
private static final Logger LOG = LoggerFactory
.getLogger(TestJobHistoryEventHandler.class);
private static MiniDFSCluster dfsCluster = null;
private static String coreSitePath;
@BeforeAll
public static void setUpClass() throws Exception {
coreSitePath = "." + File.separator + "target" + File.separator +
"test-classes" + File.separator + "core-site.xml";
Configuration conf = new HdfsConfiguration();
dfsCluster = new MiniDFSCluster.Builder(conf).build();
}
@AfterAll
public static void cleanUpClass() throws Exception {
dfsCluster.shutdown();
}
@AfterEach
public void cleanTest() throws Exception {
new File(coreSitePath).delete();
}
@Test
@Timeout(value = 50)
public void testFirstFlushOnCompletionEvent() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
60 * 1000l);
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 200);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0; i < 100; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(
t.taskID, 0, TaskType.MAP, "")));
}
handleNextNEvents(jheh, 100);
verify(mockWriter, times(0)).flush();
// First completion event, but min-queue-size for batching flushes is 10
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
verify(mockWriter).flush();
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
@Test
@Timeout(value = 50)
public void testMaxUnflushedCompletionEvents() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
60 * 1000l);
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
verify(mockWriter, times(0)).flush();
handleNextNEvents(jheh, 1);
verify(mockWriter).flush();
handleNextNEvents(jheh, 50);
verify(mockWriter, times(6)).flush();
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
@Test
@Timeout(value = 50)
public void testUnflushedTimer() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
2 * 1000l); //2 seconds.
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 100);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
assertTrue(jheh.getFlushTimerStatus());
verify(mockWriter, times(0)).flush();
Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
verify(mockWriter).flush();
assertFalse(jheh.getFlushTimerStatus());
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
@Test
@Timeout(value = 50)
public void testBatchedFlushJobEndMultiplier() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
60 * 1000l); //2 seconds.
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 3);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 0);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, 0, 0, null, null,
new Counters())));
handleNextNEvents(jheh, 29);
verify(mockWriter, times(0)).flush();
handleNextNEvents(jheh, 72);
verify(mockWriter, times(4)).flush(); //3 * 30 + 1 for JobFinished
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
// In case of all types of events, process Done files if it's last AM retry
@Test
@Timeout(value = 50)
public void testProcessDoneFilesOnLastAMRetry() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(2)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
verify(jheh, times(3)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
verify(jheh, times(4)).processDoneFiles(any(JobId.class));
mockWriter = jheh.getEventWriter();
verify(mockWriter, times(5)).write(any(HistoryEvent.class));
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
// Skip processing Done files in case of ERROR, if it's not last AM retry
@Test
@Timeout(value = 50)
public void testProcessDoneFilesNotLastAMRetry() throws Exception {
TestParams t = new TestParams(false);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(t.jobId);
// skip processing done files
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
verify(jheh, times(0)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
verify(jheh, times(2)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
verify(jheh, times(3)).processDoneFiles(t.jobId);
mockWriter = jheh.getEventWriter();
verify(mockWriter, times(5)).write(any(HistoryEvent.class));
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
@Test
public void testPropertyRedactionForJHS() throws Exception {
final Configuration conf = new Configuration();
String sensitivePropertyName = "aws.fake.credentials.name";
String sensitivePropertyValue = "aws.fake.credentials.val";
conf.set(sensitivePropertyName, sensitivePropertyValue);
conf.set(MRJobConfig.MR_JOB_REDACTED_PROPERTIES,
sensitivePropertyName);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
final TestParams params = new TestParams();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, params.dfsWorkDir);
final JHEvenHandlerForTest jheh =
new JHEvenHandlerForTest(params.mockAppContext, 0, false);
try {
jheh.init(conf);
jheh.start();
handleEvent(jheh, new JobHistoryEvent(params.jobId,
new AMStartedEvent(params.appAttemptId, 200, params.containerId,
"nmhost", 3000, 4000, -1)));
handleEvent(jheh, new JobHistoryEvent(params.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(
params.jobId), 0, 0, 0, 0, 0, 0, 0,
JobStateInternal.FAILED.toString())));
// verify the value of the sensitive property in job.xml is restored.
assertThat(conf.get(sensitivePropertyName))
.isEqualTo(sensitivePropertyValue)
.withFailMessage(sensitivePropertyName + " is modified.");
// load the job_conf.xml in JHS directory and verify property redaction.
Path jhsJobConfFile = getJobConfInIntermediateDoneDir(conf, params.jobId);
assertTrue(FileContext.getFileContext(conf).util().exists(jhsJobConfFile),
"The job_conf.xml file is not in the JHS directory");
Configuration jhsJobConf = new Configuration();
try (InputStream input = FileSystem.get(conf).open(jhsJobConfFile)) {
jhsJobConf.addResource(input);
assertEquals(MRJobConfUtil.REDACTION_REPLACEMENT_VAL,
jhsJobConf.get(sensitivePropertyName),
sensitivePropertyName + " is not redacted in HDFS.");
}
} finally {
jheh.stop();
purgeHdfsHistoryIntermediateDoneDirectory(conf);
}
}
private static Path getJobConfInIntermediateDoneDir(Configuration conf,
JobId jobId) throws IOException {
Path userDoneDir = new Path(
JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf));
Path doneDirPrefix =
FileContext.getFileContext(conf).makeQualified(userDoneDir);
return new Path(
doneDirPrefix, JobHistoryUtils.getIntermediateConfFileName(jobId));
}
private void purgeHdfsHistoryIntermediateDoneDirectory(Configuration conf)
throws IOException {
FileSystem fs = FileSystem.get(dfsCluster.getConfiguration(0));
String intermDoneDirPrefix =
JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
fs.delete(new Path(intermDoneDirPrefix), true);
}
@Test
@Timeout(value = 50)
public void testDefaultFsIsUsedForHistory() throws Exception {
// Create default configuration pointing to the minicluster
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
FileOutputStream os = new FileOutputStream(coreSitePath);
conf.writeXml(os);
os.close();
// simulate execution under a non-default namenode
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
"file:///");
TestParams t = new TestParams();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
// If we got here then event handler worked but we don't know with which
// file system. Now we check that history stuff was written to minicluster
FileSystem dfsFileSystem = dfsCluster.getFileSystem();
assertTrue(dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0,
"Minicluster contains some history files");
FileSystem localFileSystem = LocalFileSystem.get(conf);
assertFalse(localFileSystem.exists(new Path(t.dfsWorkDir)),
"No history directory on non-default file system");
} finally {
jheh.stop();
purgeHdfsHistoryIntermediateDoneDirectory(conf);
}
}
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
// Test relative path
Configuration conf = new Configuration();
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
"/mapred/history/done_intermediate");
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
assertEquals("/mapred/history/done_intermediate/" +
System.getProperty("user.name"), pathStr);
// Test fully qualified path
// Create default configuration pointing to the minicluster
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
FileOutputStream os = new FileOutputStream(coreSitePath);
conf.writeXml(os);
os.close();
// Simulate execution under a non-default namenode
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
"file:///");
pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
assertEquals(dfsCluster.getURI().toString() +
"/mapred/history/done_intermediate/" + System.getProperty("user.name"),
pathStr);
}
// test AMStartedEvent for submitTime and startTime
@Test
@Timeout(value = 50)
public void testAMStartedEvent() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, 100)));
JobHistoryEventHandler.MetaInfo mi =
JobHistoryEventHandler.fileMap.get(t.jobId);
assertThat(mi.getJobIndexInfo().getSubmitTime()).isEqualTo(100);
assertThat(mi.getJobIndexInfo().getJobStartTime()).isEqualTo(200);
assertThat(mi.getJobSummary().getJobSubmitTime()).isEqualTo(100);
assertThat(mi.getJobSummary().getJobLaunchTime()).isEqualTo(200);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
assertThat(mi.getJobIndexInfo().getSubmitTime()).isEqualTo(100);
assertThat(mi.getJobIndexInfo().getJobStartTime()).isEqualTo(200);
assertThat(mi.getJobSummary().getJobSubmitTime()).isEqualTo(100);
assertThat(mi.getJobSummary().getJobLaunchTime()).isEqualTo(200);
verify(jheh, times(1)).processDoneFiles(t.jobId);
mockWriter = jheh.getEventWriter();
verify(mockWriter, times(2)).write(any(HistoryEvent.class));
} finally {
jheh.stop();
}
}
// Have JobHistoryEventHandler handle some events and make sure they get
// stored to the Timeline store
@Test
@Timeout(value = 50)
public void testTimelineEventHandling() throws Exception {
TestParams t = new TestParams(RunningAppContext.class, false);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
long currentTime = System.currentTimeMillis();
try (MiniYARNCluster yarnCluster = new MiniYARNCluster(
TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1)) {
yarnCluster.init(conf);
yarnCluster.start();
Configuration confJHEH = new YarnConfiguration(conf);
confJHEH.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
confJHEH.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":" +
yarnCluster.getApplicationHistoryServer().getPort());
JHEvenHandlerForTest jheh = new JHEvenHandlerForTest(t.mockAppContext, 0);
jheh.init(confJHEH);
jheh.start();
TimelineStore ts = yarnCluster.getApplicationHistoryServer()
.getTimelineStore();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
currentTime - 10));
jheh.getDispatcher().await();
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null, null);
assertEquals(1, entities.getEntities().size());
TimelineEntity tEntity = entities.getEntities().get(0);
assertEquals(t.jobId.toString(), tEntity.getEntityId());
assertEquals(1, tEntity.getEvents().size());
assertEquals(EventType.AM_STARTED.toString(), tEntity.getEvents().get(0).getEventType());
assertEquals(currentTime - 10, tEntity.getEvents().get(0).getTimestamp());
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobSubmittedEvent(TypeConverter.fromYarn(t.jobId), "name",
"user", 200, "/foo/job.xml",
new HashMap<JobACL, AccessControlList>(), "default"),
currentTime + 10));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
assertEquals(t.jobId.toString(), tEntity.getEntityId());
assertEquals(2, tEntity.getEvents().size());
assertEquals(EventType.JOB_SUBMITTED.toString(), tEntity.getEvents().get(0).getEventType());
assertEquals(EventType.AM_STARTED.toString(), tEntity.getEvents().get(1).getEventType());
assertEquals(currentTime + 10, tEntity.getEvents().get(0).getTimestamp());
assertEquals(currentTime - 10, tEntity.getEvents().get(1).getTimestamp());
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
currentTime - 20));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
assertEquals(t.jobId.toString(), tEntity.getEntityId());
assertEquals(3, tEntity.getEvents().size());
assertEquals(EventType.JOB_SUBMITTED.toString(), tEntity.getEvents().get(0).getEventType());
assertEquals(EventType.AM_STARTED.toString(), tEntity.getEvents().get(1).getEventType());
assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
tEntity.getEvents().get(2).getEventType());
assertEquals(currentTime + 10, tEntity.getEvents().get(0).getTimestamp());
assertEquals(currentTime - 10, tEntity.getEvents().get(1).getTimestamp());
assertEquals(currentTime - 20, tEntity.getEvents().get(2).getTimestamp());
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
0, 0, 0, new Counters(), new Counters(), new Counters()), currentTime));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
assertEquals(t.jobId.toString(), tEntity.getEntityId());
assertEquals(4, tEntity.getEvents().size());
assertEquals(EventType.JOB_SUBMITTED.toString(), tEntity.getEvents().get(0).getEventType());
assertEquals(EventType.JOB_FINISHED.toString(), tEntity.getEvents().get(1).getEventType());
assertEquals(EventType.AM_STARTED.toString(), tEntity.getEvents().get(2).getEventType());
assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
tEntity.getEvents().get(3).getEventType());
assertEquals(currentTime + 10, tEntity.getEvents().get(0).getTimestamp());
assertEquals(currentTime, tEntity.getEvents().get(1).getTimestamp());
assertEquals(currentTime - 10, tEntity.getEvents().get(2).getTimestamp());
assertEquals(currentTime - 20, tEntity.getEvents().get(3).getTimestamp());
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
0, 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString()),
currentTime + 20));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
assertEquals(t.jobId.toString(), tEntity.getEntityId());
assertEquals(5, tEntity.getEvents().size());
assertEquals(EventType.JOB_KILLED.toString(), tEntity.getEvents().get(0).getEventType());
assertEquals(EventType.JOB_SUBMITTED.toString(), tEntity.getEvents().get(1).getEventType());
assertEquals(EventType.JOB_FINISHED.toString(), tEntity.getEvents().get(2).getEventType());
assertEquals(EventType.AM_STARTED.toString(), tEntity.getEvents().get(3).getEventType());
assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
tEntity.getEvents().get(4).getEventType());
assertEquals(currentTime + 20, tEntity.getEvents().get(0).getTimestamp());
assertEquals(currentTime + 10, tEntity.getEvents().get(1).getTimestamp());
assertEquals(currentTime, tEntity.getEvents().get(2).getTimestamp());
assertEquals(currentTime - 10, tEntity.getEvents().get(3).getTimestamp());
assertEquals(currentTime - 20, tEntity.getEvents().get(4).getTimestamp());
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null, null);
assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
assertEquals(t.taskID.toString(), tEntity.getEntityId());
assertEquals(1, tEntity.getEvents().size());
assertEquals(EventType.TASK_STARTED.toString(), tEntity.getEvents().get(0).getEventType());
assertEquals(TaskType.MAP.toString(),
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null, null);
assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
assertEquals(t.taskID.toString(), tEntity.getEntityId());
assertEquals(2, tEntity.getEvents().size());
assertEquals(EventType.TASK_STARTED.toString(), tEntity.getEvents().get(1).getEventType());
assertEquals(TaskType.REDUCE.toString(),
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
assertEquals(TaskType.MAP.toString(),
tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
}
}
@Test
@Timeout(value = 50)
public void testCountersToJSON() throws Exception {
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
Counters counters = new Counters();
CounterGroup group1 = counters.addGroup("DOCTORS",
"Incarnations of the Doctor");
group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12);
group1.addCounter("MATT_SMITH", "Matt Smith", 11);
group1.addCounter("DAVID_TENNANT", "David Tennant", 10);
CounterGroup group2 = counters.addGroup("COMPANIONS",
"Companions of the Doctor");
group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6);
group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5);
group2.addCounter("AMY_POND", "Amy Pond", 4);
group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
+ "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
+ ":\"Amy Pond\",\"VALUE\":4},{\"NAME\":\"CLARA_OSWALD\","
+ "\"DISPLAY_NAME\":\"Clara Oswald\",\"VALUE\":6},{\"NAME\":"
+ "\"DONNA_NOBLE\",\"DISPLAY_NAME\":\"Donna Noble\",\"VALUE\":2},"
+ "{\"NAME\":\"MARTHA_JONES\",\"DISPLAY_NAME\":\"Martha Jones\","
+ "\"VALUE\":3},{\"NAME\":\"RORY_WILLIAMS\",\"DISPLAY_NAME\":\"Rory "
+ "Williams\",\"VALUE\":5},{\"NAME\":\"ROSE_TYLER\",\"DISPLAY_NAME\":"
+ "\"Rose Tyler\",\"VALUE\":1}]},{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\""
+ ":\"Incarnations of the Doctor\",\"COUNTERS\":[{\"NAME\":"
+ "\"DAVID_TENNANT\",\"DISPLAY_NAME\":\"David Tennant\",\"VALUE\":10},"
+ "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":"
+ "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\","
+ "\"VALUE\":12}]}]";
assertEquals(expected, jsonStr);
}
@Test
@Timeout(value = 50)
public void testCountersToJSONEmpty() throws Exception {
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
Counters counters = null;
JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[]";
assertEquals(expected, jsonStr);
counters = new Counters();
jsonNode = JobHistoryEventUtils.countersToJSON(counters);
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
expected = "[]";
assertEquals(expected, jsonStr);
counters.addGroup("DOCTORS", "Incarnations of the Doctor");
jsonNode = JobHistoryEventUtils.countersToJSON(counters);
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
+ "Doctor\",\"COUNTERS\":[]}]";
assertEquals(expected, jsonStr);
}
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
jheh.handle(event);
}
private void handleEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event)
throws InterruptedException {
jheh.handle(event);
jheh.handleEvent(jheh.eventQueue.take());
}
private void handleNextNEvents(JHEvenHandlerForTest jheh, int numEvents)
throws InterruptedException {
for (int i = 0; i < numEvents; i++) {
jheh.handleEvent(jheh.eventQueue.take());
}
}
private String setupTestWorkDir() {
File testWorkDir = new File("target", this.getClass().getCanonicalName());
try {
FileContext.getLocalFSFileContext().delete(
new Path(testWorkDir.getAbsolutePath()), true);
return testWorkDir.getAbsolutePath();
} catch (Exception e) {
LOG.warn("Could not cleanup", e);
throw new YarnRuntimeException("could not cleanup test dir", e);
}
}
private Job mockJob() {
Job mockJob = mock(Job.class);
when(mockJob.getAllCounters()).thenReturn(new Counters());
when(mockJob.getTotalMaps()).thenReturn(10);
when(mockJob.getTotalReduces()).thenReturn(10);
when(mockJob.getName()).thenReturn("mockjob");
return mockJob;
}
private AppContext mockAppContext(Class<? extends AppContext> contextClass,
ApplicationId appId, boolean isLastAMRetry) {
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
AppContext mockContext = mock(contextClass);
Job mockJob = mockJob();
when(mockContext.getJob(jobId)).thenReturn(mockJob);
when(mockContext.getApplicationID()).thenReturn(appId);
when(mockContext.isLastAMRetry()).thenReturn(isLastAMRetry);
if (mockContext instanceof RunningAppContext) {
when(((RunningAppContext)mockContext).getTimelineClient()).
thenReturn(TimelineClient.createTimelineClient());
when(((RunningAppContext) mockContext).getTimelineV2Client())
.thenReturn(TimelineV2Client
.createTimelineClient(ApplicationId.newInstance(0, 1)));
}
return mockContext;
}
private class TestParams {
boolean isLastAMRetry;
String workDir = setupTestWorkDir();
String dfsWorkDir = "/" + this.getClass().getCanonicalName();
ApplicationId appId = ApplicationId.newInstance(200, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext;
public TestParams() {
this(AppContext.class, false);
}
public TestParams(boolean isLastAMRetry) {
this(AppContext.class, isLastAMRetry);
}
public TestParams(Class<? extends AppContext> contextClass,
boolean isLastAMRetry) {
this.isLastAMRetry = isLastAMRetry;
mockAppContext = mockAppContext(contextClass, appId, this.isLastAMRetry);
}
}
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
HistoryEvent toReturn = new JobStatusChangedEvent(new JobID(Integer.toString(jobId.getId()), jobId.getId()), "change status");
return new JobHistoryEvent(jobId, toReturn);
}
@Test
/**
* Tests that in case of SIGTERM, the JHEH stops without processing its event
* queue (because we must stop quickly lest we get SIGKILLed) and processes
* a JobUnsuccessfulEvent for jobs which were still running (so that they may
* show up in the JobHistoryServer)
*/
public void testSigTermedFunctionality() throws IOException {
AppContext mockedContext = Mockito.mock(AppContext.class);
JHEventHandlerForSigtermTest jheh =
new JHEventHandlerForSigtermTest(mockedContext, 0);
JobId jobId = Mockito.mock(JobId.class);
jheh.addToFileMap(jobId);
//Submit 4 events and check that they're handled in the absence of a signal
final int numEvents = 4;
JobHistoryEvent events[] = new JobHistoryEvent[numEvents];
for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]);
}
jheh.stop();
//Make sure events were handled
assertTrue(jheh.eventsHandled == 4, "handleEvent should've been called only 4 times but was "
+ jheh.eventsHandled);
//Create a new jheh because the last stop closed the eventWriter etc.
jheh = new JHEventHandlerForSigtermTest(mockedContext, 0);
// Make constructor of JobUnsuccessfulCompletionEvent pass
Job job = Mockito.mock(Job.class);
Mockito.when(mockedContext.getJob(jobId)).thenReturn(job);
// Make TypeConverter(JobID) pass
ApplicationId mockAppId = Mockito.mock(ApplicationId.class);
Mockito.when(mockAppId.getClusterTimestamp()).thenReturn(1000l);
Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
jheh.addToFileMap(jobId);
jheh.setForcejobCompletion(true);
for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]);
}
jheh.stop();
//Make sure events were handled, 4 + 1 finish event
assertTrue(jheh.eventsHandled == 5, "handleEvent should've been called only 5 times but was "
+ jheh.eventsHandled);
assertTrue(jheh.lastEventHandled.getHistoryEvent() instanceof JobUnsuccessfulCompletionEvent,
"Last event handled wasn't JobUnsuccessfulCompletionEvent");
}
@Test
@Timeout(value = 50)
public void testSetTrackingURLAfterHistoryIsWritten() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes and successfully writes history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(
conf, t.mockAppContext.getApplicationID());
verify(t.mockAppContext, times(1)).setHistoryUrl(historyUrl);
} finally {
jheh.stop();
}
}
@Test
@Timeout(value = 50)
public void testDontSetTrackingURLIfHistoryWriteFailed() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
doReturn(false).when(jheh).moveToDoneNow(any(Path.class),
any(Path.class));
doNothing().when(jheh).moveTmpToDone(any(Path.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes, but doesn't successfully write history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
} finally {
jheh.stop();
}
}
@Test
@Timeout(value = 50)
public void testDontSetTrackingURLIfHistoryWriteThrows() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
doThrow(new YarnRuntimeException(new IOException()))
.when(jheh).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes, but doesn't successfully write history
try {
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0,
new Counters(), new Counters(), new Counters())));
throw new RuntimeException(
"processDoneFiles didn't throw, but should have");
} catch (YarnRuntimeException yre) {
// Exception expected, do nothing
}
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
} finally {
jheh.stop();
}
}
@Test
@Timeout(value = 50)
public void testJobHistoryFilePermissions() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
String setFilePermission = "777";
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_USER_DONE_DIR_PERMISSIONS, setFilePermission);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, dfsCluster.getURI().toString());
JHEvenHandlerForTest realJheh = new JHEvenHandlerForTest(t.mockAppContext,
0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new AMStartedEvent(t.appAttemptId, 200, t.containerId, "nmhost",
3000, 4000, -1)));
// Job finishes and successfully writes history
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0,
0, 0, 0, 0, 0,
new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
String intermediateSummaryFileName = JobHistoryUtils.getIntermediateSummaryFileName(t.jobId);
String doneDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
FileSystem fs = FileSystem.get(dfsCluster.getConfiguration(0));
Path intermediateSummaryFileNamePath = new Path(doneDir, intermediateSummaryFileName);
FsPermission getIntermediateSummaryFilePermission =
fs.getFileStatus(intermediateSummaryFileNamePath).getPermission();
assertEquals(setFilePermission,
String.valueOf(getIntermediateSummaryFilePermission.toOctal()));
} finally {
jheh.stop();
}
}
}
class JHEvenHandlerForTest extends JobHistoryEventHandler {
private EventWriter eventWriter;
private boolean mockHistoryProcessing = true;
private DrainDispatcher dispatcher;
public JHEvenHandlerForTest(AppContext context, int startCount) {
super(context, startCount);
JobHistoryEventHandler.fileMap.clear();
}
public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
super(context, startCount);
this.mockHistoryProcessing = mockHistoryProcessing;
JobHistoryEventHandler.fileMap.clear();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
protected void serviceStart() {
if (timelineClient != null) {
timelineClient.start();
} else if (timelineV2Client != null) {
timelineV2Client.start();
}
if (handleTimelineEvent) {
atsEventDispatcher.start();
}
}
@Override
protected AsyncDispatcher createDispatcher() {
dispatcher = new DrainDispatcher();
return dispatcher;
}
public DrainDispatcher getDispatcher() {
return dispatcher;
}
@Override
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
if (mockHistoryProcessing) {
this.eventWriter = mock(EventWriter.class);
}
else {
this.eventWriter = super.createEventWriter(historyFilePath);
}
return this.eventWriter;
}
@Override
protected void closeEventWriter(JobId jobId) {
}
public EventWriter getEventWriter() {
return this.eventWriter;
}
@Override
protected void processDoneFiles(JobId jobId) throws IOException {
if (!mockHistoryProcessing) {
super.processDoneFiles(jobId);
}
else {
// do nothing
}
}
}
/**
* Class to help with testSigTermedFunctionality
*/
class JHEventHandlerForSigtermTest extends JobHistoryEventHandler {
public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
super(context, startCount);
JobHistoryEventHandler.fileMap.clear();
}
public void addToFileMap(JobId jobId) {
MetaInfo metaInfo = Mockito.mock(MetaInfo.class);
Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
fileMap.put(jobId, metaInfo);
}
JobHistoryEvent lastEventHandled;
int eventsHandled = 0;
@Override
public void handleEvent(JobHistoryEvent event) {
this.lastEventHandled = event;
this.eventsHandled++;
}
}