TestJobThroughManifestCommitter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.OutputValidationException;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.ValidateRenamedFilesStage;
import org.apache.hadoop.net.NetUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ID_SOURCE_MAPREDUCE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_CLASSNAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadAndPrintSuccessData;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateGeneratedFiles;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.PRINCIPAL;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage.DISABLED;
import static org.apache.hadoop.security.UserGroupInformation.getCurrentUser;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test IO through the stages.
* This mimics the workflow of a job with two tasks,
* the first task has two attempts where the second attempt
* is committed after the first attempt (simulating the
* failure-during-task-commit which the v2 algorithm cannot
* handle).
*
* The test is ordered and the output dir is not cleaned up
* after each test case.
* The last test case MUST perform the cleanup.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestJobThroughManifestCommitter
extends AbstractManifestCommitterTest {
/** Destination directory. */
private Path destDir;
/** directory names for the tests. */
private ManifestCommitterSupport.AttemptDirectories dirs;
/**
* To ensure that the local FS has a shared root path, this is static.
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private static Path sharedTestRoot = null;
/**
* Job ID.
*/
private String jobId;
/**
* Task 0 attempt 0 ID.
*/
private String taskAttempt00;
/**
* Task 0 attempt 1 ID.
*/
private String taskAttempt01;
/**
* Task 1 attempt 0 ID.
*/
private String taskAttempt10;
/**
* Task 1 attempt 1 ID.
*/
private String taskAttempt11;
/**
* Stage config for TA00.
*/
private StageConfig ta00Config;
/**
* Stage config for TA01.
*/
private StageConfig ta01Config;
/**
* Stage config for TA10.
*/
private StageConfig ta10Config;
/**
* Stage config for TA11.
*/
private StageConfig ta11Config;
/**
* Loaded manifest data, set in job commit and used in validation.
* This is static so it can be passed from where it is loaded
* {@link #test_0400_loadManifests()} to subsequent tests.
*/
private static LoadedManifestData
loadedManifestData;
@Override
public void setup() throws Exception {
super.setup();
taskAttempt00 = TASK_IDS.getTaskAttempt(TASK0, TA0);
taskAttempt01 = TASK_IDS.getTaskAttempt(TASK0, TA1);
taskAttempt10 = TASK_IDS.getTaskAttempt(TASK1, TA0);
taskAttempt11 = TASK_IDS.getTaskAttempt(TASK1, TA1);
setSharedPath(path("TestJobThroughManifestCommitter"));
// add a dir with a space in.
destDir = new Path(sharedTestRoot, "out put");
jobId = TASK_IDS.getJobId();
// then the specific path underneath that for the attempt.
dirs = new ManifestCommitterSupport.AttemptDirectories(destDir,
jobId, 1);
// config for job attempt 1, task 00
setJobStageConfig(createStageConfigForJob(JOB1, destDir).build());
ta00Config = createStageConfig(JOB1, TASK0, TA0, destDir).build();
ta01Config = createStageConfig(JOB1, TASK0, TA1, destDir).build();
ta10Config = createStageConfig(JOB1, TASK1, TA0, destDir).build();
ta11Config = createStageConfig(JOB1, TASK1, TA1, destDir).build();
}
/**
* Test dir deletion is removed from test case teardown so the
* subsequent tests see the output.
* @throws IOException failure
*/
@Override
protected void deleteTestDirInTeardown() throws IOException {
/* no-op */
}
/**
* Override point and something to turn on/off when exploring what manifests look like.
* Stores where storage is billed MUST enable this.
* @return true if, at the end of the run, the test dir should be deleted.
*/
protected boolean shouldDeleteTestRootAtEndOfTestRun() {
return false;
}
/**
* Invoke this to clean up the test directories.
*/
private void deleteSharedTestRoot() throws IOException {
describe("Deleting shared test root %s", sharedTestRoot);
rm(getFileSystem(), sharedTestRoot, true, false);
}
/**
* Set the shared test root if not already set.
* @param path path to set.
* @return true if the path was set
*/
private static synchronized boolean setSharedPath(final Path path) {
if (sharedTestRoot == null) {
// set this as needed
LOG.info("Set shared path to {}", path);
sharedTestRoot = path;
return true;
}
return false;
}
@Test
public void test_0000_setupTestDir() throws Throwable {
describe("always ensure directory setup is empty");
deleteSharedTestRoot();
}
@Test
public void test_0100_setupJobStage() throws Throwable {
describe("Set up a job");
verifyPath("Job attempt dir",
dirs.getJobAttemptDir(),
new SetupJobStage(getJobStageConfig()).apply(true));
}
/**
* And the check that the stage worked.
* @throws IOException failure.
*/
private void verifyJobSetupCompleted() throws IOException {
assertPathExists("Job attempt dir from test_0100", dirs.getJobAttemptDir());
}
@Test
public void test_0110_setupJobOnlyAllowedOnce() throws Throwable {
describe("a second creation of a job attempt must fail");
verifyJobSetupCompleted();
intercept(FileAlreadyExistsException.class, "", () ->
new SetupJobStage(getJobStageConfig()).apply(true));
// job is still there
assertPathExists("Job attempt dir", dirs.getJobAttemptDir());
}
@Test
public void test_0120_setupJobNewAttemptNumber() throws Throwable {
describe("Creating a new job attempt is supported");
verifyJobSetupCompleted();
Path path = pathMustExist("Job attempt 2 dir",
new SetupJobStage(createStageConfig(2, -1, 0, destDir))
.apply(false));
Assertions.assertThat(path)
.describedAs("Stage created path")
.isNotEqualTo(dirs.getJobAttemptDir());
}
@Test
public void test_0200_setupTask00() throws Throwable {
describe("Set up a task; job must have been set up first");
verifyJobSetupCompleted();
verifyPath("Task attempt 00",
dirs.getTaskAttemptPath(taskAttempt00),
new SetupTaskStage(ta00Config).apply("first"));
}
/**
* Verify TA00 is set up.
*/
private void verifyTaskAttempt00SetUp() throws IOException {
pathMustExist("Dir from taskAttempt00 setup",
dirs.getTaskAttemptPath(taskAttempt00));
}
@Test
public void test_0210_setupTask00OnlyAllowedOnce() throws Throwable {
describe("Second attempt to set up task00 must fail.");
verifyTaskAttempt00SetUp();
intercept(FileAlreadyExistsException.class, "second", () ->
new SetupTaskStage(ta00Config).apply("second"));
}
@Test
public void test_0220_setupTask01() throws Throwable {
describe("Setup task attempt 01");
verifyTaskAttempt00SetUp();
verifyPath("Task attempt 01",
dirs.getTaskAttemptPath(taskAttempt01),
new SetupTaskStage(ta01Config)
.apply("01"));
}
@Test
public void test_0230_setupTask10() throws Throwable {
describe("Setup task attempt 10");
verifyJobSetupCompleted();
verifyPath("Task attempt 10",
dirs.getTaskAttemptPath(taskAttempt10),
new SetupTaskStage(ta10Config)
.apply("10"));
}
/**
* Setup then abort task 11 before creating any files;
* verify that commit fails before creating a manifest file.
*/
@Test
public void test_0240_setupThenAbortTask11() throws Throwable {
describe("Setup then abort task attempt 11");
verifyJobSetupCompleted();
Path ta11Path = new SetupTaskStage(ta11Config).apply("11");
Path deletedDir = new AbortTaskStage(ta11Config).apply(false);
Assertions.assertThat(ta11Path)
.isEqualTo(deletedDir);
assertPathDoesNotExist("aborted directory", ta11Path);
// execute will fail as there's no dir to list.
intercept(FileNotFoundException.class, () ->
new CommitTaskStage(ta11Config).apply(null));
assertPathDoesNotExist("task manifest",
manifestPathForTask(dirs.getTaskManifestDir(),
TASK_IDS.getTaskId(TASK1)));
}
/**
* Execute TA01 by generating a lot of files in its directory
* then committing the task attempt.
* The manifest at the task path (i.e. the record of which attempt's
* output is to be used) MUST now have been generated by this TA.
*/
@Test
public void test_0300_executeTask00() throws Throwable {
describe("Create the files for Task 00, then commit the task");
List<Path> files = createFilesOrDirs(dirs.getTaskAttemptPath(taskAttempt00),
"part-00", getExecutorService(),
DEPTH, WIDTH, FILES_PER_DIRECTORY, false);
// saves the task manifest to the job dir
CommitTaskStage.Result result = new CommitTaskStage(ta00Config)
.apply(null);
verifyPathExists(getFileSystem(), "manifest",
result.getPath());
TaskManifest manifest = result.getTaskManifest();
manifest.validate();
// clear the IOStats to reduce the size of the printed JSON.
manifest.setIOStatistics(null);
LOG.info("Task Manifest {}", manifest.toJson());
validateTaskAttemptManifest(this.taskAttempt00, files, manifest);
}
/**
* Validate the manifest of a task attempt.
* @param attemptId attempt ID
* @param files files which were created.
* @param manifest manifest
* @throws IOException IO problem
*/
protected void validateTaskAttemptManifest(
String attemptId,
List<Path> files,
TaskManifest manifest) throws IOException {
verifyManifestTaskAttemptID(manifest, attemptId);
// validate the manifest
verifyManifestFilesMatch(manifest, files);
}
/**
* Execute TA01 by generating a lot of files in its directory
* then committing the task attempt.
* The manifest at the task path (i.e. the record of which attempt's
* output is to be used) MUST now have been generated by this TA.
* Any existing manifest will have been overwritten.
*/
@Test
public void test_0310_executeTask01() throws Throwable {
describe("Create the files for Task 01, then commit the task");
List<Path> files = createFilesOrDirs(dirs.getTaskAttemptPath(taskAttempt01),
"part-00", getExecutorService(),
DEPTH, WIDTH, FILES_PER_DIRECTORY, false);
// saves the task manifest to the job dir
CommitTaskStage.Result result = new CommitTaskStage(ta01Config)
.apply(null);
Path manifestPath = verifyPathExists(getFileSystem(), "manifest",
result.getPath()).getPath();
// load the manifest from the FS, not the return value,
// so we can verify that last task to commit wins.
TaskManifest manifest = TaskManifest.load(getFileSystem(), manifestPath);
manifest.validate();
// clear the IOStats to reduce the size of the printed JSON.
manifest.setIOStatistics(null);
LOG.info("Task Manifest {}", manifest.toJson());
validateTaskAttemptManifest(taskAttempt01, files, manifest);
}
/**
* Second task writes to more directories, but fewer files per dir.
* This ensures that there will dirs here which aren't in the first
* attempt.
*/
@Test
public void test_0320_executeTask10() throws Throwable {
describe("Create the files for Task 10, then commit the task");
List<Path> files = createFilesOrDirs(
dirs.getTaskAttemptPath(ta10Config.getTaskAttemptId()),
"part-01", getExecutorService(),
DEPTH, WIDTH + 1, FILES_PER_DIRECTORY - 1, false);
// saves the task manifest to the job dir
CommitTaskStage.Result result = new CommitTaskStage(ta10Config)
.apply(null);
TaskManifest manifest = result.getTaskManifest();
validateTaskAttemptManifest(taskAttempt10, files, manifest);
}
@Test
public void test_0340_setupThenAbortTask11() throws Throwable {
describe("Setup then abort task attempt 11");
Path ta11Path = new SetupTaskStage(ta11Config).apply("11");
createFilesOrDirs(
ta11Path,
"part-01", getExecutorService(),
2, 1, 1, false);
new AbortTaskStage(ta11Config).apply(false);
assertPathDoesNotExist("aborted directory", ta11Path);
// execute will fail as there's no dir to list.
intercept(FileNotFoundException.class, () ->
new CommitTaskStage(ta11Config).apply(null));
// and the manifest MUST be unchanged from the previous stage
Path manifestPathForTask1 = manifestPathForTask(dirs.getTaskManifestDir(),
TASK_IDS.getTaskId(TASK1));
verifyManifestTaskAttemptID(
TaskManifest.load(getFileSystem(), manifestPathForTask1),
taskAttempt10);
}
/**
* Load all the committed manifests, which must be TA01 (last of
* task 0 to commit) and TA10.
*/
@Test
public void test_0400_loadManifests() throws Throwable {
describe("Load all manifests; committed must be TA01 and TA10");
File entryFile = File.createTempFile("entry", ".seq");
LoadManifestsStage.Arguments args = new LoadManifestsStage.Arguments(
entryFile, DEFAULT_WRITER_QUEUE_CAPACITY);
LoadManifestsStage.Result result
= new LoadManifestsStage(getJobStageConfig()).apply(args);
loadedManifestData = result.getLoadedManifestData();
Assertions.assertThat(loadedManifestData)
.describedAs("manifest data from %s", result)
.isNotNull();
final LoadManifestsStage.SummaryInfo stageSummary = result.getSummary();
String summary = stageSummary.toString();
LOG.info("Manifest summary {}", summary);
Assertions.assertThat(stageSummary.getTaskAttemptIDs())
.describedAs("Task attempts in %s", summary)
.hasSize(2)
.contains(taskAttempt01, taskAttempt10);
}
@Test
public void test_0410_commitJob() throws Throwable {
describe("Commit the job");
CommitJobStage stage = new CommitJobStage(getJobStageConfig());
stage.apply(new CommitJobStage.Arguments(true, false, null, DISABLED));
}
/**
* Validate that the job output is good by invoking the
* {@link ValidateRenamedFilesStage} stage to
* validate all the manifests.
*/
@Test
public void test_0420_validateJob() throws Throwable {
describe("Validate the output of the job through the validation"
+ " stage");
Assumptions.assumeThat(loadedManifestData)
.describedAs("Loaded Manifest Data from earlier stage")
.isNotNull();
// load in the success data.
ManifestSuccessData successData = loadAndPrintSuccessData(
getFileSystem(),
getJobStageConfig().getJobSuccessMarkerPath());
// Now verify their files exist, returning the list of renamed files.
final List<FileEntry> validatedEntries = new ValidateRenamedFilesStage(getJobStageConfig())
.apply(loadedManifestData.getEntrySequenceData());
List<String> committedFiles = validatedEntries
.stream().map(FileEntry::getDest)
.collect(Collectors.toList());
// verify that the list of committed files also matches
// that in the _SUCCESS file
// note: there's a limit to the #of files in the SUCCESS file
// to stop writing it slowing down jobs; therefore we don't
// make a simple "all must match check
Assertions.assertThat(committedFiles)
.containsAll(successData.getFilenames());
}
@Test
public void test_0430_validateStatistics() throws Throwable {
// load in the success data.
ManifestSuccessData successData = ManifestSuccessData.load(
getFileSystem(),
getJobStageConfig().getJobSuccessMarkerPath());
String json = successData.toJson();
LOG.info("Success data is {}", json);
Assertions.assertThat(successData)
.describedAs("Manifest " + json)
.returns(NetUtils.getLocalHostname(),
ManifestSuccessData::getHostname)
.returns(MANIFEST_COMMITTER_CLASSNAME,
ManifestSuccessData::getCommitter)
.returns(jobId,
ManifestSuccessData::getJobId)
.returns(true,
ManifestSuccessData::getSuccess)
.returns(JOB_ID_SOURCE_MAPREDUCE,
ManifestSuccessData::getJobIdSource);
// diagnostics
Assertions.assertThat(successData.getDiagnostics())
.containsEntry(PRINCIPAL,
getCurrentUser().getShortUserName())
.containsEntry(STAGE, OP_STAGE_JOB_COMMIT);
// and stats
IOStatisticsSnapshot iostats = successData.getIOStatistics();
int files = successData.getFilenames().size();
verifyStatisticCounterValue(iostats,
OP_STAGE_JOB_COMMIT, 1);
assertThatStatisticCounter(iostats,
COMMITTER_FILES_COMMITTED_COUNT)
.isGreaterThanOrEqualTo(files);
Long totalFiles = iostats.counters().get(COMMITTER_FILES_COMMITTED_COUNT);
verifyStatisticCounterValue(iostats,
COMMITTER_BYTES_COMMITTED_COUNT, totalFiles * 2);
}
@Test
public void test_0440_validateSuccessFiles() throws Throwable {
// load in the success data.
final FileSystem fs = getFileSystem();
ManifestSuccessData successData = loadAndPrintSuccessData(
fs,
getJobStageConfig().getJobSuccessMarkerPath());
validateGeneratedFiles(fs,
getJobStageConfig().getDestinationDir(),
successData, false);
}
/**
* Verify that the validation stage will correctly report a failure
* if one of the files has as different name.
*/
@Test
public void test_0450_validationDetectsFailures() throws Throwable {
// delete an entry, repeat
final List<FileEntry> validatedEntries = new ValidateRenamedFilesStage(getJobStageConfig())
.apply(loadedManifestData.getEntrySequenceData());
final Path path = validatedEntries.get(0).getDestPath();
final Path p2 = new Path(path.getParent(), path.getName() + "-renamed");
final FileSystem fs = getFileSystem();
fs.rename(path, p2);
try {
intercept(OutputValidationException.class, () ->
new ValidateRenamedFilesStage(getJobStageConfig())
.apply(loadedManifestData.getEntrySequenceData()));
} finally {
// if this doesn't happen, later stages will fail.
fs.rename(p2, path);
}
}
@Test
public void test_0900_cleanupJob() throws Throwable {
describe("Cleanup job");
CleanupJobStage.Arguments arguments = new CleanupJobStage.Arguments(
OP_STAGE_JOB_CLEANUP, true, true,
false, false, 0);
// the first run will list the three task attempt dirs and delete each
// one before the toplevel dir.
CleanupJobStage.Result result = new CleanupJobStage(
getJobStageConfig()).apply(arguments);
assertCleanupResult(result, CleanupJobStage.Outcome.PARALLEL_DELETE, 1 + 3);
assertPathDoesNotExist("Job attempt dir", result.getDirectory());
// not an error if we retry and the dir isn't there
result = new CleanupJobStage(getJobStageConfig()).apply(arguments);
assertCleanupResult(result, CleanupJobStage.Outcome.NOTHING_TO_CLEAN_UP, 0);
}
/**
* Needed to clean up the shared test root, as test case teardown
* does not do it.
*/
@Test
public void test_9999_cleanupTestDir() throws Throwable {
if (shouldDeleteTestRootAtEndOfTestRun()) {
deleteSharedTestRoot();
}
}
}