AbstractITCommitProtocol.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_INTELLIGENT_TIERING;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test the job/task commit actions of an S3A Committer, including trying to
* simulate some failure and retry conditions.
* Derived from
* {@code org.apache.hadoop.mapreduce.lib.output.TestFileOutputCommitter}.
*
* This is a complex test suite as it tries to explore the full lifecycle
* of committers, and is designed for subclassing.
*/
@SuppressWarnings({"unchecked", "unused"})
public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
private Path outDir;
private static final Logger LOG =
LoggerFactory.getLogger(AbstractITCommitProtocol.class);
private static final String SUB_DIR = "SUB_DIR";
protected static final String PART_00000 = "part-m-00000";
/**
* Counter to guarantee that even in parallel test runs, no job has the same
* ID.
*/
private String jobId;
// A random task attempt id for testing.
private String attempt0;
private TaskAttemptID taskAttempt0;
private String attempt1;
private TaskAttemptID taskAttempt1;
private static final Text KEY_1 = new Text("key1");
private static final Text KEY_2 = new Text("key2");
private static final Text VAL_1 = new Text("val1");
private static final Text VAL_2 = new Text("val2");
/** A job to abort in test case teardown. */
private final List<JobData> abortInTeardown = new ArrayList<>(1);
private final StandardCommitterFactory
standardCommitterFactory = new StandardCommitterFactory();
private void cleanupDestDir() throws IOException {
rmdir(this.outDir, getConfiguration());
}
/**
* This must return the name of a suite which is unique to the non-abstract
* test.
* @return a string which must be unique and a valid path.
*/
protected abstract String suitename();
/**
* Get the log; can be overridden for test case log.
* @return a log.
*/
public Logger log() {
return LOG;
}
/**
* Overridden method returns the suitename as well as the method name,
* so if more than one committer test is run in parallel, paths are
* isolated.
* @return a name for a method, unique across the suites and test cases.
*/
@Override
protected String getMethodName() {
return suitename() + "-" + super.getMethodName();
}
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
jobId = randomJobId();
attempt0 = "attempt_" + jobId + "_m_000000_0";
taskAttempt0 = TaskAttemptID.forName(attempt0);
attempt1 = "attempt_" + jobId + "_m_000001_0";
taskAttempt1 = TaskAttemptID.forName(attempt1);
outDir = path(getMethodName());
abortMultipartUploadsUnderPath(outDir);
cleanupDestDir();
}
@AfterEach
@Override
public void teardown() throws Exception {
describe("teardown");
abortInTeardown.forEach(this::abortJobQuietly);
if (outDir != null) {
try (AuditSpan span = span()) {
abortMultipartUploadsUnderPath(outDir);
cleanupDestDir();
} catch (IOException e) {
log().info("Exception during cleanup", e);
}
}
S3AFileSystem fileSystem = getFileSystem();
if (fileSystem != null) {
log().info("Statistics for {}:\n{}", fileSystem.getUri(),
fileSystem.getInstrumentation().dump(" ", " = ", "\n", true));
}
super.teardown();
}
/**
* This only looks for leakage of committer thread pools,
* and not any other leaked threads, such as those from S3A FS instances.
*/
@AfterAll
public static void checkForThreadLeakage() {
List<String> committerThreads = getCurrentThreadNames().stream()
.filter(n -> n.startsWith(AbstractS3ACommitter.THREAD_PREFIX))
.collect(Collectors.toList());
Assertions.assertThat(committerThreads)
.describedAs("Outstanding committer threads")
.isEmpty();
}
/**
* Add the specified job to the current list of jobs to abort in teardown.
* @param jobData job data.
*/
protected void abortInTeardown(JobData jobData) {
abortInTeardown.add(jobData);
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
disableFilesystemCaching(conf);
bindCommitter(conf);
return conf;
}
/***
* Bind to the committer from the methods of
* {@link #getCommitterFactoryName()} and {@link #getCommitterName()}.
* @param conf configuration to set up
*/
protected void bindCommitter(Configuration conf) {
super.bindCommitter(conf, getCommitterFactoryName(), getCommitterName());
}
/**
* Create a committer for a task.
* @param context task context
* @return new committer
* @throws IOException failure
*/
protected AbstractS3ACommitter createCommitter(
TaskAttemptContext context) throws IOException {
return createCommitter(getOutDir(), context);
}
/**
* Create a committer for a task and a given output path.
* @param outputPath path
* @param context task context
* @return new committer
* @throws IOException failure
*/
protected abstract AbstractS3ACommitter createCommitter(
Path outputPath,
TaskAttemptContext context) throws IOException;
protected String getCommitterFactoryName() {
return CommitConstants.S3A_COMMITTER_FACTORY;
}
protected abstract String getCommitterName();
protected Path getOutDir() {
return outDir;
}
protected String getJobId() {
return jobId;
}
protected String getAttempt0() {
return attempt0;
}
protected TaskAttemptID getTaskAttempt0() {
return taskAttempt0;
}
protected String getAttempt1() {
return attempt1;
}
protected TaskAttemptID getTaskAttempt1() {
return taskAttempt1;
}
/**
* Functional interface for creating committers, designed to allow
* different factories to be used to create different failure modes.
*/
@FunctionalInterface
public interface CommitterFactory {
/**
* Create a committer for a task.
* @param context task context
* @return new committer
* @throws IOException failure
*/
AbstractS3ACommitter createCommitter(
TaskAttemptContext context) throws IOException;
}
/**
* The normal committer creation factory, uses the abstract methods
* in the class.
*/
public class StandardCommitterFactory implements CommitterFactory {
@Override
public AbstractS3ACommitter createCommitter(TaskAttemptContext context)
throws IOException {
return AbstractITCommitProtocol.this.createCommitter(context);
}
}
/**
* Write some text out.
* @param context task
* @throws IOException IO failure
* @throws InterruptedException write interrupted
* @return the path written to
*/
protected Path writeTextOutput(TaskAttemptContext context)
throws IOException, InterruptedException {
describe("write output");
try (DurationInfo d = new DurationInfo(LOG,
"Writing Text output for task %s", context.getTaskAttemptID())) {
LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
recordWriter = new LoggingTextOutputFormat<>().getRecordWriter(
context);
writeOutput(recordWriter,
context);
return recordWriter.getDest();
}
}
/**
* Write the standard output.
* @param writer record writer
* @param context task context
* @throws IOException IO failure
* @throws InterruptedException write interrupted
*/
private void writeOutput(RecordWriter writer,
TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get();
try(CloseWriter cw = new CloseWriter(writer, context)) {
writer.write(KEY_1, VAL_1);
writer.write(null, nullWritable);
writer.write(null, VAL_1);
writer.write(nullWritable, VAL_2);
writer.write(KEY_2, nullWritable);
writer.write(KEY_1, null);
writer.write(null, null);
writer.write(KEY_2, VAL_2);
writer.close(context);
}
}
/**
* Write the output of a map.
* @param writer record writer
* @param context task context
* @throws IOException IO failure
* @throws InterruptedException write interrupted
*/
private void writeMapFileOutput(RecordWriter writer,
TaskAttemptContext context) throws IOException, InterruptedException {
describe("\nWrite map output");
try (DurationInfo d = new DurationInfo(LOG,
"Writing Text output for task %s", context.getTaskAttemptID());
CloseWriter cw = new CloseWriter(writer, context)) {
for (int i = 0; i < 10; ++i) {
Text val = ((i & 1) == 1) ? VAL_1 : VAL_2;
writer.write(new LongWritable(i), val);
}
writer.close(context);
}
}
/**
* Details on a job for use in {@code startJob} and elsewhere.
*/
public static class JobData {
private final Job job;
private final JobContext jContext;
private final TaskAttemptContext tContext;
private final AbstractS3ACommitter committer;
private final Configuration conf;
private Path writtenTextPath; // null if not written to
public JobData(Job job,
JobContext jContext,
TaskAttemptContext tContext,
AbstractS3ACommitter committer) {
this.job = job;
this.jContext = jContext;
this.tContext = tContext;
this.committer = committer;
conf = job.getConfiguration();
}
public Job getJob() {
return job;
}
public JobContext getJContext() {
return jContext;
}
public TaskAttemptContext getTContext() {
return tContext;
}
public AbstractS3ACommitter getCommitter() {
return committer;
}
public Configuration getConf() {
return conf;
}
public Path getWrittenTextPath() {
return writtenTextPath;
}
}
/**
* Create a new job. Sets the task attempt ID,
* and output dir; asks for a success marker.
* @return the new job
* @throws IOException failure
*/
public Job newJob() throws IOException {
return newJob(outDir, getConfiguration(), attempt0);
}
/**
* Create a new job. Sets the task attempt ID,
* and output dir; asks for a success marker.
* @param dir dest dir
* @param configuration config to get the job from
* @param taskAttemptId task attempt
* @return the new job
* @throws IOException failure
*/
private Job newJob(Path dir, Configuration configuration,
String taskAttemptId) throws IOException {
Job job = Job.getInstance(configuration);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId);
conf.setBoolean(CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
FileOutputFormat.setOutputPath(job, dir);
return job;
}
/**
* Start a job with a committer; optionally write the test data.
* Always register the job to be aborted (quietly) in teardown.
* This is, from an "OO-purity perspective" the wrong kind of method to
* do: it's setting things up, mixing functionality, registering for teardown.
* Its aim is simple though: a common body of code for starting work
* in test cases.
* @param writeText should the text be written?
* @return the job data 4-tuple
* @throws IOException IO problems
* @throws InterruptedException interruption during write
*/
protected JobData startJob(boolean writeText)
throws IOException, InterruptedException {
return startJob(standardCommitterFactory, writeText);
}
/**
* Start a job with a committer; optionally write the test data.
* Always register the job to be aborted (quietly) in teardown.
* This is, from an "OO-purity perspective" the wrong kind of method to
* do: it's setting things up, mixing functionality, registering for teardown.
* Its aim is simple though: a common body of code for starting work
* in test cases.
* @param factory the committer factory to use
* @param writeText should the text be written?
* @return the job data 4-tuple
* @throws IOException IO problems
* @throws InterruptedException interruption during write
*/
protected JobData startJob(CommitterFactory factory, boolean writeText)
throws IOException, InterruptedException {
Job job = newJob();
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
taskAttempt0);
AbstractS3ACommitter committer = factory.createCommitter(tContext);
// setup
JobData jobData = new JobData(job, jContext, tContext, committer);
setup(jobData);
abortInTeardown(jobData);
if (writeText) {
// write output
jobData.writtenTextPath = writeTextOutput(tContext);
}
return jobData;
}
/**
* Set up the job and task.
* @param jobData job data
* @throws IOException problems
*/
protected void setup(JobData jobData) throws IOException {
AbstractS3ACommitter committer = jobData.committer;
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
describe("\nsetup job");
try (DurationInfo d = new DurationInfo(LOG,
"setup job %s", jContext.getJobID())) {
committer.setupJob(jContext);
}
setupCommitter(committer, tContext);
describe("setup complete\n");
}
private void setupCommitter(
final AbstractS3ACommitter committer,
final TaskAttemptContext tContext) throws IOException {
try (DurationInfo d = new DurationInfo(LOG,
"setup task %s", tContext.getTaskAttemptID())) {
committer.setupTask(tContext);
}
}
/**
* Abort a job quietly.
* @param jobData job info
*/
protected void abortJobQuietly(JobData jobData) {
abortJobQuietly(jobData.committer, jobData.jContext, jobData.tContext);
}
/**
* Abort a job quietly: first task, then job.
* @param committer committer
* @param jContext job context
* @param tContext task context
*/
protected void abortJobQuietly(AbstractS3ACommitter committer,
JobContext jContext,
TaskAttemptContext tContext) {
describe("\naborting task");
try {
committer.abortTask(tContext);
} catch (IOException e) {
log().warn("Exception aborting task:", e);
}
describe("\naborting job");
try {
committer.abortJob(jContext, JobStatus.State.KILLED);
} catch (IOException e) {
log().warn("Exception aborting job", e);
}
}
/**
* Commit up the task and then the job.
* @param committer committer
* @param jContext job context
* @param tContext task context
* @throws IOException problems
*/
protected void commit(AbstractS3ACommitter committer,
JobContext jContext,
TaskAttemptContext tContext) throws IOException {
try (DurationInfo d = new DurationInfo(LOG,
"committing work", jContext.getJobID())) {
describe("\ncommitting task");
committer.commitTask(tContext);
describe("\ncommitting job");
committer.commitJob(jContext);
describe("commit complete\n");
}
}
/**
* Execute work as part of a test, after creating the job.
* After the execution, {@link #abortJobQuietly(JobData)} is
* called for abort/cleanup.
* @param name name of work (for logging)
* @param action action to execute
* @throws Exception failure
*/
protected void executeWork(String name, ActionToTest action)
throws Exception {
executeWork(name, startJob(false), action);
}
/**
* Execute work as part of a test, against the created job.
* After the execution, {@link #abortJobQuietly(JobData)} is
* called for abort/cleanup.
* @param name name of work (for logging)
* @param jobData job info
* @param action action to execute
* @throws Exception failure
*/
public void executeWork(String name,
JobData jobData,
ActionToTest action) throws Exception {
try (DurationInfo d = new DurationInfo(LOG, "Executing %s", name)) {
action.exec(jobData.job,
jobData.jContext,
jobData.tContext,
jobData.committer);
} finally {
abortJobQuietly(jobData);
}
}
/**
* Verify that recovery doesn't work for these committers.
*/
@Test
@SuppressWarnings("deprecation")
public void testRecoveryAndCleanup() throws Exception {
describe("Test (Unsupported) task recovery.");
JobData jobData = startJob(true);
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
assertNotNull(committer.getWorkPath(),
"null workPath in committer " + committer);
assertNotNull(committer.getOutputPath(),
"null outputPath in committer " + committer);
// note the task attempt path.
Path job1TaskAttempt0Path = committer.getTaskAttemptPath(tContext);
// Commit the task. This will promote data and metadata to where
// job commits will pick it up on commit or abort.
commitTask(committer, tContext);
assertTaskAttemptPathDoesNotExist(committer, tContext);
Configuration conf2 = jobData.job.getConfiguration();
conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
JobContext jContext2 = new JobContextImpl(conf2, taskAttempt0.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2,
taskAttempt0);
AbstractS3ACommitter committer2 = createCommitter(tContext2);
committer2.setupJob(tContext2);
assertFalse(committer2.isRecoverySupported(),
"recoverySupported in " + committer2);
intercept(PathCommitException.class, "recover",
() -> committer2.recoverTask(tContext2));
// the new task attempt path is different from the first, because the
// job attempt counter is used in the path
final Path job2TaskAttempt0Path = committer2.getTaskAttemptPath(tContext2);
LOG.info("Job attempt 1 task attempt path {}; attempt 2 path {}",
job1TaskAttempt0Path, job2TaskAttempt0Path);
assertNotEquals(job1TaskAttempt0Path,
job2TaskAttempt0Path, "Task attempt paths must differ");
// at this point, task attempt 0 has failed to recover
// it should be abortable though. This will be a no-op as it already
// committed
describe("aborting task attempt 2; expect nothing to clean up");
committer2.abortTask(tContext2);
describe("Aborting job 2; expect pending commits to be aborted");
committer2.abortJob(jContext2, JobStatus.State.KILLED);
// now, state of system may still have pending data
assertNoMultipartUploadsPending(outDir);
}
protected void assertTaskAttemptPathDoesNotExist(
AbstractS3ACommitter committer, TaskAttemptContext context)
throws IOException {
Path attemptPath = committer.getTaskAttemptPath(context);
ContractTestUtils.assertPathDoesNotExist(
attemptPath.getFileSystem(context.getConfiguration()),
"task attempt dir",
attemptPath);
}
protected void assertJobAttemptPathDoesNotExist(
AbstractS3ACommitter committer, JobContext context)
throws IOException {
Path attemptPath = committer.getJobAttemptPath(context);
ContractTestUtils.assertPathDoesNotExist(
attemptPath.getFileSystem(context.getConfiguration()),
"job attempt dir",
attemptPath);
}
/**
* Verify the output of the directory.
* That includes the {@code part-m-00000-*}
* file existence and contents, as well as optionally, the success marker.
* @param dir directory to scan.
* @param expectSuccessMarker check the success marker?
* @param expectedJobId job ID, verified if non-empty and success data loaded
* @throws Exception failure.
*/
private void validateContent(Path dir,
boolean expectSuccessMarker,
String expectedJobId) throws Exception {
if (expectSuccessMarker) {
SuccessData successData = verifySuccessMarker(dir, expectedJobId);
}
Path expectedFile = getPart0000(dir);
log().debug("Validating content in {}", expectedFile);
StringBuilder expectedOutput = new StringBuilder();
expectedOutput.append(KEY_1).append('\t').append(VAL_1).append("\n");
expectedOutput.append(VAL_1).append("\n");
expectedOutput.append(VAL_2).append("\n");
expectedOutput.append(KEY_2).append("\n");
expectedOutput.append(KEY_1).append("\n");
expectedOutput.append(KEY_2).append('\t').append(VAL_2).append("\n");
String output = readFile(expectedFile);
assertEquals(expectedOutput.toString(), output,
"Content of " + expectedFile);
}
/**
* Verify storage class of output file matches the expected storage class.
* @param dir output directory.
* @param expectedStorageClass expected storage class value.
* @throws Exception failure.
*/
private void validateStorageClass(Path dir, String expectedStorageClass) throws Exception {
Path expectedFile = getPart0000(dir);
String actualStorageClass = getS3AInternals().getObjectMetadata(expectedFile)
.storageClassAsString();
Assertions.assertThat(actualStorageClass)
.describedAs("Storage class of object %s", expectedFile)
.isEqualToIgnoringCase(expectedStorageClass);
}
/**
* Identify any path under the directory which begins with the
* {@code "part-m-00000"} sequence.
* @param dir directory to scan
* @return the full path
* @throws FileNotFoundException the path is missing.
* @throws Exception failure.
*/
protected Path getPart0000(final Path dir) throws Exception {
final FileSystem fs = dir.getFileSystem(getConfiguration());
FileStatus[] statuses = fs.listStatus(dir,
path -> path.getName().startsWith(PART_00000));
if (statuses.length != 1) {
// fail, with a listing of the parent dir
ContractTestUtils.assertPathExists(fs, "Output file",
new Path(dir, PART_00000));
}
return statuses[0].getPath();
}
/**
* Look for the partFile subdir of the output dir.
* @param fs filesystem
* @param dir output dir
* @throws Exception failure.
*/
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws Exception {
// map output is a directory with index and data files
assertPathExists("Map output", dir);
Path expectedMapDir = getPart0000(dir);
assertPathExists("Map output", expectedMapDir);
assertIsDirectory(expectedMapDir);
FileStatus[] files = fs.listStatus(expectedMapDir);
Assertions.assertThat(files)
.describedAs("Files found in " + expectedMapDir)
.hasSizeGreaterThan(0);
assertPathExists("index file in " + expectedMapDir,
new Path(expectedMapDir, MapFile.INDEX_FILE_NAME));
assertPathExists("data file in " + expectedMapDir,
new Path(expectedMapDir, MapFile.DATA_FILE_NAME));
}
/**
* Dump all MPUs in the filesystem.
* @throws IOException IO failure
*/
protected void dumpMultipartUploads() throws IOException {
countMultipartUploads("");
}
/**
* Full test of the expected lifecycle: start job, task, write, commit task,
* commit job.
* @throws Exception on a failure
*/
@Test
public void testCommitLifecycle() throws Exception {
describe("Full test of the expected lifecycle:\n" +
" start job, task, write, commit task, commit job.\n" +
"Verify:\n" +
"* no files are visible after task commit\n" +
"* the expected file is visible after job commit\n" +
"* no outstanding MPUs after job commit");
JobData jobData = startJob(false);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
validateTaskAttemptWorkingDirectory(committer, tContext);
// write output
describe("1. Writing output");
writeTextOutput(tContext);
dumpMultipartUploads();
describe("2. Committing task");
assertTrue(committer.needsTaskCommit(tContext),
"No files to commit were found by " + committer);
commitTask(committer, tContext);
// this is only task commit; there MUST be no part- files in the dest dir
try {
applyLocatedFiles(getFileSystem().listFiles(outDir, false),
(status) -> Assertions.assertThat(status.getPath().toString())
.describedAs("task committed file to dest :" + status)
.doesNotContain("part"));
} catch (FileNotFoundException ignored) {
log().info("Outdir {} is not created by task commit phase ",
outDir);
}
describe("3. Committing job");
assertMultipartUploadsPending(outDir);
commitJob(committer, jContext);
// validate output
describe("4. Validating content");
validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
assertNoMultipartUploadsPending(outDir);
}
@Test
public void testCommitWithStorageClassConfig() throws Exception {
describe("Commit with specific storage class configuration;" +
" expect the final file has correct storage class.");
Configuration conf = getConfiguration();
skipIfStorageClassTestsDisabled(conf);
conf.set(STORAGE_CLASS, STORAGE_CLASS_INTELLIGENT_TIERING);
JobData jobData = startJob(false);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
validateTaskAttemptWorkingDirectory(committer, tContext);
// write output
writeTextOutput(tContext);
// commit task
dumpMultipartUploads();
commitTask(committer, tContext);
// commit job
assertMultipartUploadsPending(outDir);
commitJob(committer, jContext);
// validate output
validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
assertNoMultipartUploadsPending(outDir);
// validate storage class
validateStorageClass(outDir, STORAGE_CLASS_INTELLIGENT_TIERING);
}
@Test
public void testCommitterWithDuplicatedCommit() throws Exception {
describe("Call a task then job commit twice;" +
"expect the second task commit to fail.");
JobData jobData = startJob(true);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
// do commit
commit(committer, jContext, tContext);
// validate output
validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
assertNoMultipartUploadsPending(outDir);
// commit task to fail on retry
// FNFE is not thrown in case of Magic committer when
// in memory commit data is enabled and hence skip the check.
boolean skipExpectFNFE = committer instanceof MagicS3GuardCommitter &&
isTrackMagicCommitsInMemoryEnabled(tContext.getConfiguration());
if (!skipExpectFNFE) {
expectFNFEonTaskCommit(committer, tContext);
}
}
/**
* HADOOP-17258. If a second task attempt is committed, it
* must succeed, and the output of the first TA, even if already
* committed, MUST NOT be visible in the final output.
* <p></p>
* What's important is not just that only one TA must succeed,
* but it must be the last one executed. Why? because that's
* the one
*/
@Test
public void testTwoTaskAttemptsCommit() throws Exception {
describe("Commit two task attempts;" +
" expect the second attempt to succeed.");
JobData jobData = startJob(false);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
// do commit
describe("\ncommitting task");
// write output for TA 1,
Path outputTA1 = writeTextOutput(tContext);
// speculatively execute committer 2.
// jobconf with a different base to its parts.
Configuration conf2 = jobData.conf;
conf2.set("mapreduce.output.basename", "attempt2");
String attempt2 = "attempt_" + jobId + "_m_000000_1";
TaskAttemptID ta2 = TaskAttemptID.forName(attempt2);
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(
conf2, ta2);
AbstractS3ACommitter committer2 = standardCommitterFactory
.createCommitter(tContext2);
setupCommitter(committer2, tContext2);
// write output for TA 2,
Path outputTA2 = writeTextOutput(tContext2);
// verify the names are different.
String name1 = outputTA1.getName();
String name2 = outputTA2.getName();
Assertions.assertThat(name1)
.describedAs("name of task attempt output %s", outputTA1)
.isNotEqualTo(name2);
// commit task 1
committer.commitTask(tContext);
// then pretend that task1 didn't respond, so
// commit task 2
committer2.commitTask(tContext2);
// and the job
committer2.commitJob(tContext);
// validate output
S3AFileSystem fs = getFileSystem();
SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1,
"");
Assertions.assertThat(successData.getFilenames())
.describedAs("Files committed")
.hasSize(1);
assertPathExists("attempt2 output", new Path(outDir, name2));
assertPathDoesNotExist("attempt1 output", new Path(outDir, name1));
assertNoMultipartUploadsPending(outDir);
}
protected boolean shouldExpectSuccessMarker() {
return true;
}
/**
* Simulate a failure on the first job commit; expect the
* second to succeed.
*/
@Test
public void testCommitterWithFailure() throws Exception {
describe("Fail the first job commit then retry");
JobData jobData = startJob(new FailingCommitterFactory(), true);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
// do commit
committer.commitTask(tContext);
// now fail job
expectSimulatedFailureOnJobCommit(jContext, committer);
commitJob(committer, jContext);
// but the data got there, due to the order of operations.
validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
expectJobCommitToFail(jContext, committer);
}
/**
* Override point: the failure expected on the attempt to commit a failed
* job.
* @param jContext job context
* @param committer committer
* @throws Exception any unexpected failure.
*/
protected void expectJobCommitToFail(JobContext jContext,
AbstractS3ACommitter committer) throws Exception {
// next attempt will fail as there is no longer a directory to commit
expectJobCommitFailure(jContext, committer,
FileNotFoundException.class);
}
/**
* Expect a job commit operation to fail with a specific exception.
* @param jContext job context
* @param committer committer
* @param clazz class of exception
* @return the caught exception
* @throws Exception any unexpected failure.
*/
protected static <E extends IOException> E expectJobCommitFailure(
JobContext jContext,
AbstractS3ACommitter committer,
Class<E> clazz)
throws Exception {
return intercept(clazz,
() -> {
committer.commitJob(jContext);
return committer.toString();
});
}
protected static void expectFNFEonTaskCommit(
AbstractS3ACommitter committer,
TaskAttemptContext tContext) throws Exception {
intercept(FileNotFoundException.class,
() -> {
committer.commitTask(tContext);
return committer.toString();
});
}
/**
* Simulate a failure on the first job commit; expect the
* second to succeed.
*/
@Test
public void testCommitterWithNoOutputs() throws Exception {
describe("Have a task and job with no outputs: expect success");
JobData jobData = startJob(new FailingCommitterFactory(), false);
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
// do commit
committer.commitTask(tContext);
assertTaskAttemptPathDoesNotExist(committer, tContext);
}
protected static void expectSimulatedFailureOnJobCommit(JobContext jContext,
AbstractS3ACommitter committer) throws Exception {
((CommitterFaultInjection) committer).setFaults(
CommitterFaultInjection.Faults.commitJob);
expectJobCommitFailure(jContext, committer,
CommitterFaultInjectionImpl.Failure.class);
}
@Test
public void testMapFileOutputCommitter() throws Exception {
describe("Test that the committer generates map output into a directory\n" +
"starting with the prefix part-");
JobData jobData = startJob(false);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
Configuration conf = jobData.conf;
// write output
writeMapFileOutput(new MapFileOutputFormat().getRecordWriter(tContext),
tContext);
// do commit
commit(committer, jContext, tContext);
S3AFileSystem fs = getFileSystem();
lsR(fs, outDir, true);
String ls = ls(outDir);
describe("\nvalidating");
// validate output
verifySuccessMarker(outDir, committer.getUUID());
describe("validate output of %s", outDir);
validateMapFileOutputContent(fs, outDir);
// Ensure getReaders call works and also ignores
// hidden filenames (_ or . prefixes)
describe("listing");
FileStatus[] filtered = fs.listStatus(outDir, HIDDEN_FILE_FILTER);
Assertions.assertThat(filtered)
.describedAs("listed children under " + ls)
.hasSize(1);
FileStatus fileStatus = filtered[0];
Assertions.assertThat(fileStatus.getPath().getName())
.describedAs("Not a part file: " + fileStatus)
.startsWith(PART_00000);
describe("getReaders()");
Assertions.assertThat(getReaders(fs, outDir, conf))
.describedAs("Number of MapFile.Reader entries with shared FS %s: %s",
outDir, ls)
.hasSize(1);
describe("getReaders(new FS)");
FileSystem fs2 = FileSystem.get(outDir.toUri(), conf);
Assertions.assertThat(getReaders(fs2, outDir, conf))
.describedAs("Number of MapFile.Reader entries with shared FS2 %s: %s",
outDir, ls)
.hasSize(1);
describe("MapFileOutputFormat.getReaders");
Assertions.assertThat(MapFileOutputFormat.getReaders(outDir, conf))
.describedAs("Number of MapFile.Reader entries with new FS in %s: %s",
outDir, ls)
.hasSize(1);
}
/** Open the output generated by this format. */
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private static MapFile.Reader[] getReaders(FileSystem fs,
Path dir,
Configuration conf) throws IOException {
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, HIDDEN_FILE_FILTER));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(names[i], conf);
}
return parts;
}
/**
* A functional interface which an action to test must implement.
*/
@FunctionalInterface
public interface ActionToTest {
void exec(Job job, JobContext jContext, TaskAttemptContext tContext,
AbstractS3ACommitter committer) throws Exception;
}
@Test
public void testAbortTaskNoWorkDone() throws Exception {
executeWork("abort task no work",
(job, jContext, tContext, committer) ->
committer.abortTask(tContext));
}
@Test
public void testAbortJobNoWorkDone() throws Exception {
executeWork("abort task no work",
(job, jContext, tContext, committer) ->
committer.abortJob(jContext, JobStatus.State.RUNNING));
}
@Test
public void testCommitJobButNotTask() throws Exception {
executeWork("commit a job while a task's work is pending, " +
"expect task writes to be cancelled.",
(job, jContext, tContext, committer) -> {
// step 1: write the text
writeTextOutput(tContext);
// step 2: commit the job
createCommitter(tContext).commitJob(tContext);
// verify that no output can be observed
assertPart0000DoesNotExist(outDir);
// that includes, no pending MPUs; commitJob is expected to
// cancel any.
assertNoMultipartUploadsPending(outDir);
}
);
}
@Test
public void testAbortTaskThenJob() throws Exception {
JobData jobData = startJob(true);
AbstractS3ACommitter committer = jobData.committer;
// do abort
committer.abortTask(jobData.tContext);
intercept(FileNotFoundException.class, "",
() -> getPart0000(committer.getWorkPath()));
committer.abortJob(jobData.jContext, JobStatus.State.FAILED);
assertJobAbortCleanedUp(jobData);
}
/**
* Extension point: assert that the job was all cleaned up after an abort.
* Base assertions
* <ul>
* <li>Output dir is absent or, if present, empty</li>
* <li>No pending MPUs to/under the output dir</li>
* </ul>
* @param jobData job data
* @throws Exception failure
*/
public void assertJobAbortCleanedUp(JobData jobData) throws Exception {
// special handling of magic directory; harmless in staging
S3AFileSystem fs = getFileSystem();
try {
FileStatus[] children = listChildren(fs, outDir);
if (children.length != 0) {
lsR(fs, outDir, true);
}
assertArrayEquals(new FileStatus[0], children,
"Output directory not empty " + ls(outDir));
} catch (FileNotFoundException e) {
// this is a valid failure mode; it means the dest dir doesn't exist yet.
}
assertNoMultipartUploadsPending(outDir);
}
@Test
public void testFailAbort() throws Exception {
describe("Abort the task, then job (failed), abort the job again");
JobData jobData = startJob(true);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
// do abort
committer.abortTask(tContext);
committer.getJobAttemptPath(jContext);
committer.getTaskAttemptPath(tContext);
assertPart0000DoesNotExist(outDir);
assertSuccessMarkerDoesNotExist(outDir);
describe("Aborting job into %s", outDir);
committer.abortJob(jContext, JobStatus.State.FAILED);
assertTaskAttemptPathDoesNotExist(committer, tContext);
assertJobAttemptPathDoesNotExist(committer, jContext);
// try again; expect abort to be idempotent.
committer.abortJob(jContext, JobStatus.State.FAILED);
assertNoMultipartUploadsPending(outDir);
}
public void assertPart0000DoesNotExist(Path dir) throws Exception {
intercept(FileNotFoundException.class,
() -> getPart0000(dir));
assertPathDoesNotExist("expected output file", new Path(dir, PART_00000));
}
@Test
public void testAbortJobNotTask() throws Exception {
executeWork("abort task no work",
(job, jContext, tContext, committer) -> {
// write output
writeTextOutput(tContext);
committer.abortJob(jContext, JobStatus.State.RUNNING);
assertTaskAttemptPathDoesNotExist(
committer, tContext);
assertJobAttemptPathDoesNotExist(
committer, jContext);
assertNoMultipartUploadsPending(outDir);
});
}
/**
* This looks at what happens with concurrent commits.
* However, the failure condition it looks for (subdir under subdir)
* is the kind of failure you see on a rename-based commit.
*
* What it will not detect is the fact that both tasks will each commit
* to the destination directory. That is: whichever commits last wins.
*
* There's no way to stop this. Instead it is a requirement that the task
* commit operation is only executed when the committer is happy to
* commit only those tasks which it knows have succeeded, and abort those
* which have not.
* @throws Exception failure
*/
@Test
public void testConcurrentCommitTaskWithSubDir() throws Exception {
Job job = newJob();
FileOutputFormat.setOutputPath(job, outDir);
final Configuration conf = job.getConfiguration();
final JobContext jContext =
new JobContextImpl(conf, taskAttempt0.getJobID());
AbstractS3ACommitter amCommitter = createCommitter(
new TaskAttemptContextImpl(conf, taskAttempt0));
amCommitter.setupJob(jContext);
final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
taCtx[0] = new TaskAttemptContextImpl(conf, taskAttempt0);
taCtx[1] = new TaskAttemptContextImpl(conf, taskAttempt1);
final TextOutputFormat[] tof = new LoggingTextOutputFormat[2];
for (int i = 0; i < tof.length; i++) {
tof[i] = new LoggingTextOutputFormat() {
@Override
public Path getDefaultWorkFile(
TaskAttemptContext context,
String extension) throws IOException {
final AbstractS3ACommitter foc = (AbstractS3ACommitter)
getOutputCommitter(context);
return new Path(new Path(foc.getWorkPath(), SUB_DIR),
getUniqueFile(context, getOutputName(context), extension));
}
};
}
final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
try {
for (int i = 0; i < taCtx.length; i++) {
final int taskIdx = i;
executor.submit(() -> {
final OutputCommitter outputCommitter =
tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
outputCommitter.setupTask(taCtx[taskIdx]);
final RecordWriter rw =
tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
writeOutput(rw, taCtx[taskIdx]);
describe("Committing Task %d", taskIdx);
outputCommitter.commitTask(taCtx[taskIdx]);
return null;
});
}
} finally {
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
log().info("Awaiting thread termination!");
}
}
// if we commit here then all tasks will be committed, so there will
// be contention for that final directory: both parts will go in.
describe("\nCommitting Job");
amCommitter.commitJob(jContext);
assertPathExists("base output directory", outDir);
assertPart0000DoesNotExist(outDir);
Path outSubDir = new Path(outDir, SUB_DIR);
assertPathDoesNotExist("Must not end up with sub_dir/sub_dir",
new Path(outSubDir, SUB_DIR));
// validate output
// There's no success marker in the subdirectory
validateContent(outSubDir, false, "");
}
/**
* Create a committer which fails; the class
* {@link CommitterFaultInjectionImpl} implements the logic.
* @param tContext task context
* @return committer instance
* @throws IOException failure to instantiate
*/
protected abstract AbstractS3ACommitter createFailingCommitter(
TaskAttemptContext tContext) throws IOException;
/**
* Factory for failing committers.
*/
public class FailingCommitterFactory implements CommitterFactory {
@Override
public AbstractS3ACommitter createCommitter(TaskAttemptContext context)
throws IOException {
return createFailingCommitter(context);
}
}
@Test
public void testOutputFormatIntegration() throws Throwable {
Configuration conf = getConfiguration();
Job job = newJob();
job.setOutputFormatClass(LoggingTextOutputFormat.class);
conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
taskAttempt0);
LoggingTextOutputFormat outputFormat = (LoggingTextOutputFormat)
ReflectionUtils.newInstance(tContext.getOutputFormatClass(), conf);
AbstractS3ACommitter committer = (AbstractS3ACommitter)
outputFormat.getOutputCommitter(tContext);
// setup
JobData jobData = new JobData(job, jContext, tContext, committer);
setup(jobData);
abortInTeardown(jobData);
LoggingTextOutputFormat.LoggingLineRecordWriter recordWriter
= outputFormat.getRecordWriter(tContext);
IntWritable iw = new IntWritable(1);
recordWriter.write(iw, iw);
long expectedLength = 4;
Path dest = recordWriter.getDest();
validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID());
recordWriter.close(tContext);
// at this point
// Skip validation when commit data is stored in memory
if (!isTrackMagicCommitsInMemoryEnabled(conf)) {
validateTaskAttemptPathAfterWrite(dest, expectedLength);
}
assertTrue(committer.needsTaskCommit(tContext),
"Committer does not have data to commit " + committer);
commitTask(committer, tContext);
// at this point the committer tasks stats should be current.
IOStatisticsSnapshot snapshot = new IOStatisticsSnapshot(
committer.getIOStatistics());
String commitsCompleted = COMMITTER_TASKS_SUCCEEDED.getSymbol();
assertThatStatisticCounter(snapshot, commitsCompleted)
.describedAs("task commit count")
.isEqualTo(1L);
commitJob(committer, jContext);
LOG.info("committer iostatistics {}",
ioStatisticsSourceToString(committer));
// validate output
SuccessData successData = verifySuccessMarker(outDir, committer.getUUID());
// the task commit count should get through the job commit
IOStatisticsSnapshot successStats = successData.getIOStatistics();
LOG.info("loaded statistics {}", successStats);
assertThatStatisticCounter(successStats, commitsCompleted)
.describedAs("task commit count")
.isEqualTo(1L);
}
/**
* Create a committer through reflection then use it to abort
* a task. This mimics the action of an AM when a container fails and
* the AM wants to abort the task attempt.
*/
@Test
public void testAMWorkflow() throws Throwable {
describe("Create a committer with a null output path & use as an AM");
JobData jobData = startJob(true);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
TaskAttemptContext newAttempt = taskAttemptForJob(
MRBuilderUtils.newJobId(1, 1, 1), jContext);
Configuration conf = jContext.getConfiguration();
// bind
LoggingTextOutputFormat.bind(conf);
OutputFormat<?, ?> outputFormat
= ReflectionUtils.newInstance(newAttempt
.getOutputFormatClass(), conf);
Path outputPath = FileOutputFormat.getOutputPath(newAttempt);
assertNotNull(outputPath, "null output path in new task attempt");
AbstractS3ACommitter committer2 = (AbstractS3ACommitter)
outputFormat.getOutputCommitter(newAttempt);
committer2.abortTask(tContext);
assertNoMultipartUploadsPending(getOutDir());
}
@Test
public void testParallelJobsToAdjacentPaths() throws Throwable {
describe("Run two jobs in parallel, assert they both complete");
JobData jobData = startJob(true);
Job job1 = jobData.job;
AbstractS3ACommitter committer1 = jobData.committer;
JobContext jContext1 = jobData.jContext;
TaskAttemptContext tContext1 = jobData.tContext;
// now build up a second job
String jobId2 = randomJobId();
String attempt20 = "attempt_" + jobId2 + "_m_000000_0";
TaskAttemptID taskAttempt20 = TaskAttemptID.forName(attempt20);
String attempt21 = "attempt_" + jobId2 + "_m_000001_0";
TaskAttemptID taskAttempt21 = TaskAttemptID.forName(attempt21);
Path job1Dest = outDir;
Path job2Dest = new Path(getOutDir().getParent(),
getMethodName() + "job2Dest");
// little safety check
assertNotEquals(job1Dest, job2Dest);
// create the second job
Job job2 = newJob(job2Dest,
unsetUUIDOptions(new JobConf(getConfiguration())),
attempt20);
Configuration conf2 = job2.getConfiguration();
conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
try {
JobContext jContext2 = new JobContextImpl(conf2,
taskAttempt20.getJobID());
TaskAttemptContext tContext2 =
new TaskAttemptContextImpl(conf2, taskAttempt20);
AbstractS3ACommitter committer2 = createCommitter(job2Dest, tContext2);
JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
setup(jobData2);
abortInTeardown(jobData2);
// make sure the directories are different
assertNotEquals(committer1.getOutputPath(),
committer2.getOutputPath(), "Committer output paths");
assertNotEquals(committer1.getUUID(),
committer2.getUUID(), "job UUIDs");
// job2 setup, write some data there
writeTextOutput(tContext2);
// at this point, job1 and job2 both have uncommitted tasks
// commit tasks in order task 2, task 1.
commitTask(committer2, tContext2);
commitTask(committer1, tContext1);
assertMultipartUploadsPending(job1Dest);
assertMultipartUploadsPending(job2Dest);
// commit jobs in order job 1, job 2
commitJob(committer1, jContext1);
assertNoMultipartUploadsPending(job1Dest);
getPart0000(job1Dest);
assertMultipartUploadsPending(job2Dest);
commitJob(committer2, jContext2);
getPart0000(job2Dest);
assertNoMultipartUploadsPending(job2Dest);
} finally {
// uncommitted files to this path need to be deleted in tests which fail
abortMultipartUploadsUnderPath(job2Dest);
}
}
/**
* Run two jobs with the same destination and different output paths.
* <p></p>
* This only works if the jobs are set to NOT delete all outstanding
* uploads under the destination path.
* <p></p>
* See HADOOP-17318.
*/
@Test
public void testParallelJobsToSameDestination() throws Throwable {
describe("Run two jobs to the same destination, assert they both complete");
Configuration conf = getConfiguration();
conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false);
// this job has a job ID generated and set as the spark UUID;
// the config is also set to require it.
// This mimics the Spark setup process.
String stage1Id = UUID.randomUUID().toString();
conf.set(SPARK_WRITE_UUID, stage1Id);
conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
// create the job and write data in its task attempt
JobData jobData = startJob(true);
Job job1 = jobData.job;
AbstractS3ACommitter committer1 = jobData.committer;
JobContext jContext1 = jobData.jContext;
TaskAttemptContext tContext1 = jobData.tContext;
Path job1TaskOutputFile = jobData.writtenTextPath;
// the write path
Assertions.assertThat(committer1.getWorkPath().toString())
.describedAs("Work path path of %s", committer1)
.contains(stage1Id);
// now build up a second job
String jobId2 = randomJobId();
// second job will use same ID
String attempt2 = taskAttempt0.toString();
TaskAttemptID taskAttempt2 = taskAttempt0;
// create the second job
Configuration c2 = unsetUUIDOptions(new JobConf(conf));
c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
Job job2 = newJob(outDir,
c2,
attempt2);
Configuration jobConf2 = job2.getConfiguration();
jobConf2.set("mapreduce.output.basename", "task2");
String stage2Id = UUID.randomUUID().toString();
jobConf2.set(SPARK_WRITE_UUID,
stage2Id);
JobContext jContext2 = new JobContextImpl(jobConf2,
taskAttempt2.getJobID());
TaskAttemptContext tContext2 =
new TaskAttemptContextImpl(jobConf2, taskAttempt2);
AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
Assertions.assertThat(committer2.getJobAttemptPath(jContext2))
.describedAs("Job attempt path of %s", committer2)
.isNotEqualTo(committer1.getJobAttemptPath(jContext1));
Assertions.assertThat(committer2.getTaskAttemptPath(tContext2))
.describedAs("Task attempt path of %s", committer2)
.isNotEqualTo(committer1.getTaskAttemptPath(tContext1));
Assertions.assertThat(committer2.getWorkPath().toString())
.describedAs("Work path path of %s", committer2)
.isNotEqualTo(committer1.getWorkPath().toString())
.contains(stage2Id);
Assertions.assertThat(committer2.getUUIDSource())
.describedAs("UUID source of %s", committer2)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID);
JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
setup(jobData2);
abortInTeardown(jobData2);
// the sequence is designed to ensure that job2 has active multipart
// uploads during/after job1's work
// if the committer is a magic committer, MPUs start in the write,
// otherwise in task commit.
boolean multipartInitiatedInWrite =
committer2 instanceof MagicS3GuardCommitter;
// job2. Here we start writing a file and have that write in progress
// when job 1 commits.
LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
recordWriter2 = new LoggingTextOutputFormat<>().getRecordWriter(
tContext2);
LOG.info("Commit Task 1");
commitTask(committer1, tContext1);
if (multipartInitiatedInWrite) {
// magic committer runs -commit job1 while a job2 TA has an open
// writer (and hence: open MP Upload)
LOG.info("With Multipart Initiated In Write: Commit Job 1");
commitJob(committer1, jContext1);
}
// job2/task writes its output to the destination and
// closes the file
writeOutput(recordWriter2, tContext2);
// get the output file
Path job2TaskOutputFile = recordWriter2.getDest();
// commit the second task
LOG.info("Commit Task 2");
commitTask(committer2, tContext2);
if (!multipartInitiatedInWrite) {
// if not a magic committer, commit the job now. Because at
// this point the staging committer tasks from job2 will be pending
LOG.info("With Multipart NOT Initiated In Write: Commit Job 1");
assertJobAttemptPathExists(committer1, jContext1);
commitJob(committer1, jContext1);
}
// run the warning scan code, which will find output.
// this can be manually reviewed in the logs to verify
// readability
committer2.warnOnActiveUploads(outDir);
// and second job
LOG.info("Commit Job 2");
assertJobAttemptPathExists(committer2, jContext2);
commitJob(committer2, jContext2);
// validate the output
Path job1Output = new Path(outDir, job1TaskOutputFile.getName());
Path job2Output = new Path(outDir, job2TaskOutputFile.getName());
assertNotEquals(job1Output, job2Output,
"Job output file filenames must be different");
// job1 output must be there
assertPathExists("job 1 output", job1Output);
// job 2 file is there
assertPathExists("job 2 output", job2Output);
// and nothing is pending
assertNoMultipartUploadsPending(outDir);
}
/**
* Verify self-generated UUID logic.
* A committer used for job setup can also use it for task setup,
* but a committer which generated a job ID but was only
* used for task setup -that is rejected.
* Task abort will still work.
*/
@Test
public void testSelfGeneratedUUID() throws Throwable {
describe("Run two jobs to the same destination, assert they both complete");
Configuration conf = getConfiguration();
unsetUUIDOptions(conf);
// job is set to generate UUIDs
conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
// create the job. don't write anything
JobData jobData = startJob(false);
AbstractS3ACommitter committer = jobData.committer;
String uuid = committer.getUUID();
Assertions.assertThat(committer.getUUIDSource())
.describedAs("UUID source of %s", committer)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally);
// examine the job configuration and verify that it has been updated
Configuration jobConf = jobData.conf;
Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID, null))
.describedAs("Config option " + FS_S3A_COMMITTER_UUID)
.isEqualTo(uuid);
Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID_SOURCE, null))
.describedAs("Config option " + FS_S3A_COMMITTER_UUID_SOURCE)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally
.getText());
// because the task was set up in the job, it can have task
// setup called, even though it had a random ID.
committer.setupTask(jobData.tContext);
// but a new committer will not be set up
TaskAttemptContext tContext2 =
new TaskAttemptContextImpl(conf, taskAttempt1);
AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
Assertions.assertThat(committer2.getUUIDSource())
.describedAs("UUID source of %s", committer2)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally);
assertNotEquals(committer.getUUID(),
committer2.getUUID(), "job UUIDs");
// Task setup MUST fail.
intercept(PathCommitException.class,
E_SELF_GENERATED_JOB_UUID, () -> {
committer2.setupTask(tContext2);
return committer2;
});
// task abort with the self-generated option is fine.
committer2.abortTask(tContext2);
}
/**
* Verify the option to require a UUID applies and
* when a committer is instantiated without those options,
* it fails early.
*/
@Test
public void testRequirePropagatedUUID() throws Throwable {
Configuration conf = getConfiguration();
unsetUUIDOptions(conf);
conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
// create the job, expect a failure, even if UUID generation
// is enabled.
intercept(PathCommitException.class, E_NO_SPARK_UUID, () ->
startJob(false));
}
/**
* Strip staging/spark UUID options.
* @param conf config
* @return the patched config
*/
protected Configuration unsetUUIDOptions(final Configuration conf) {
conf.unset(SPARK_WRITE_UUID);
conf.unset(FS_S3A_COMMITTER_UUID);
conf.unset(FS_S3A_COMMITTER_GENERATE_UUID);
conf.unset(FS_S3A_COMMITTER_REQUIRE_UUID);
return conf;
}
/**
* Assert that a committer's job attempt path exists.
* For the staging committers, this is in the cluster FS.
* @param committer committer
* @param jobContext job context
* @throws IOException failure
*/
protected void assertJobAttemptPathExists(
final AbstractS3ACommitter committer,
final JobContext jobContext) throws IOException {
Path attemptPath = committer.getJobAttemptPath(jobContext);
ContractTestUtils.assertIsDirectory(
attemptPath.getFileSystem(committer.getConf()),
attemptPath);
}
@Test
public void testS3ACommitterFactoryBinding() throws Throwable {
describe("Verify that the committer factory returns this "
+ "committer when configured to do so");
Job job = newJob();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
taskAttempt0);
S3ACommitterFactory factory = new S3ACommitterFactory();
Assertions.assertThat(factory.createOutputCommitter(outDir, tContext).getClass())
.describedAs("Committer from factory with name %s", getCommitterName())
.isEqualTo(createCommitter(outDir, tContext).getClass());
}
/**
* Validate the path of a file being written to during the write
* itself.
* @param p path
* @param expectedLength
* @param jobId job id
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength,
String jobId) throws IOException {
}
/**
* Validate the path of a file being written to after the write
* operation has completed.
* @param p path
* @param expectedLength
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathAfterWrite(Path p,
final long expectedLength) throws IOException {
}
/**
* Perform any actions needed to validate the working directory of
* a committer.
* For example: filesystem, path attributes
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
protected void validateTaskAttemptWorkingDirectory(
AbstractS3ACommitter committer,
TaskAttemptContext context) throws IOException {
}
/**
* Commit a task then validate the state of the committer afterwards.
* @param committer committer
* @param tContext task context
* @throws IOException IO failure
*/
protected void commitTask(final AbstractS3ACommitter committer,
final TaskAttemptContext tContext) throws IOException {
committer.commitTask(tContext);
}
/**
* Commit a job then validate the state of the committer afterwards.
* @param committer committer
* @param jContext job context
* @throws IOException IO failure
*/
protected void commitJob(final AbstractS3ACommitter committer,
final JobContext jContext) throws IOException {
committer.commitJob(jContext);
}
}