TestManifestCommitProtocol.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.RemoteIterators;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_FACTORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_ABORT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.randomJobId;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
 * This is a contract test for the commit protocol on a target filesystem.
 * It is subclassed in the ABFS integration tests and elsewhere.
 * Derived from the S3A protocol suite, which was itself based off
 * the test suite {@code TestFileOutputCommitter}.
 *
 * Some of the methods trigger java warnings about unchecked casts;
 * it's impossible to remove them, so the checks are suppressed.
 */
@SuppressWarnings("unchecked")
public class TestManifestCommitProtocol
    extends AbstractManifestCommitterTest {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestManifestCommitProtocol.class);

  private static final String SUB_DIR = "SUB_DIR";

  /**
   * Part of the name of the output of task attempt 0.
   */
  protected static final String PART_00000 = "part-m-00000";

  private static final Text KEY_1 = new Text("key1");

  private static final Text KEY_2 = new Text("key2");

  private static final Text VAL_1 = new Text("val1");

  private static final Text VAL_2 = new Text("val2");

  /**
   * Snapshot of stats, which will be collected from
   * committers.
   */
  private static final IOStatisticsSnapshot IOSTATISTICS =
      IOStatisticsSupport.snapshotIOStatistics();

  /**
   * Job ID for jobs.
   */
  private final String jobId;

  /**
   * A random task attempt id for testing.
   */
  private final String attempt0;

  /**
   *  Attempt 0's task attempt ID.
   */
  private final TaskAttemptID taskAttempt0;

  /**
   * TA 1.
   */
  private final TaskAttemptID taskAttempt1;

  /**
   * Attempt 1 string value.
   */
  private final String attempt1;


  /** A job to abort in test case teardown. */
  private final List<JobData> abortInTeardown = new ArrayList<>(1);

  /**
   * Output directory.
   * This is the directory into which output goes;
   * all the job files go in _temporary underneath.
   */
  private Path outputDir;

  /**
   * Committer factory which calls back into
   * {@link #createCommitter(Path, TaskAttemptContext)}.
   */
  private final LocalCommitterFactory
      localCommitterFactory = new LocalCommitterFactory();

  /**
   * Clean up the output dir. No-op if
   * {@link #outputDir} is null.
   * @throws IOException failure to delete
   */
  private void cleanupOutputDir() throws IOException {
    if (outputDir != null) {
      getFileSystem().delete(outputDir, true);
    }
  }

  /**
   * Constructor.
   */
  public TestManifestCommitProtocol() {
    ManifestCommitterTestSupport.JobAndTaskIDsForTests taskIDs
        = new ManifestCommitterTestSupport.JobAndTaskIDsForTests(2, 2);
    jobId = taskIDs.getJobId();
    attempt0 = taskIDs.getTaskAttempt(0, 0);
    taskAttempt0 = taskIDs.getTaskAttemptIdType(0, 0);
    attempt1 = taskIDs.getTaskAttempt(0, 1);
    taskAttempt1 = taskIDs.getTaskAttemptIdType(0, 1);
  }

  /**
   * This must return the name of a suite which is unique to the test.
   * @return a string which must be unique and a valid path.
   */
  protected String suitename() {
    return "TestManifestCommitProtocolLocalFS";
  }

  /**
   * Get the log; can be overridden for test case log.
   * @return a log.
   */
  public Logger log() {
    return LOG;
  }

  /**
   * Overridden method returns the suitename as well as the method name,
   * so if more than one committer test is run in parallel, paths are
   * isolated.
   * @return a name for a method, unique across the suites and test cases.
   */
  @Override
  protected String getMethodName() {
    return suitename() + "-" + super.getMethodName();
  }

  @Override
  public void setup() throws Exception {
    super.setup();

    outputDir = path(getMethodName());
    cleanupOutputDir();
  }

  @Override
  public void teardown() throws Exception {
    describe("teardown");
    Thread.currentThread().setName("teardown");
    for (JobData jobData : abortInTeardown) {
      // stop the job
      abortJobQuietly(jobData);
      // and then get its statistics
      IOSTATISTICS.aggregate(jobData.committer.getIOStatistics());
    }
    try {
      cleanupOutputDir();
    } catch (IOException e) {
      log().info("Exception during cleanup", e);
    }
    super.teardown();
  }

  @AfterClass
  public static void logAggregateIOStatistics() {
    LOG.info("Final IOStatistics {}",
        ioStatisticsToPrettyString(IOSTATISTICS));
  }

  /**
   * Add the specified job to the current list of jobs to abort in teardown.
   * @param jobData job data.
   */
  protected void abortInTeardown(JobData jobData) {
    abortInTeardown.add(jobData);
  }

  @Override
  protected Configuration createConfiguration() {
    Configuration conf = super.createConfiguration();
    bindCommitter(conf);
    return conf;
  }

  /***
   * Set job up to use the manifest committer.
   * @param conf configuration to set up
   */
  protected void bindCommitter(Configuration conf) {
    conf.set(COMMITTER_FACTORY_CLASS, MANIFEST_COMMITTER_FACTORY);
  }

  /**
   * Create a committer for a task.
   * @param context task context
   * @return new committer
   * @throws IOException failure
   */
  protected ManifestCommitter createCommitter(
      TaskAttemptContext context) throws IOException {
    return createCommitter(getOutputDir(), context);
  }

  /**
   * Create a committer for a task and a given output path.
   * @param outputPath path
   * @param context task context
   * @return new committer
   * @throws IOException failure
   */
  protected ManifestCommitter createCommitter(
      Path outputPath,
      TaskAttemptContext context) throws IOException {
    return new ManifestCommitter(outputPath, context);
  }

  protected Path getOutputDir() {
    return outputDir;
  }

  protected String getJobId() {
    return jobId;
  }

  protected String getAttempt0() {
    return attempt0;
  }

  protected TaskAttemptID getTaskAttempt0() {
    return taskAttempt0;
  }

  protected String getAttempt1() {
    return attempt1;
  }

  protected TaskAttemptID getTaskAttempt1() {
    return taskAttempt1;
  }

  /**
   * Functional interface for creating committers, designed to allow
   * different factories to be used to create different failure modes.
   */
  @FunctionalInterface
  public interface CommitterFactory {

    /**
     * Create a committer for a task.
     * @param context task context
     * @return new committer
     * @throws IOException failure
     */
    ManifestCommitter createCommitter(
        TaskAttemptContext context) throws IOException;
  }

  /**
   * The normal committer creation factory, uses the abstract methods
   * in the class.
   */
  protected class LocalCommitterFactory implements CommitterFactory {

    @Override
    public ManifestCommitter createCommitter(TaskAttemptContext context)
        throws IOException {
      return TestManifestCommitProtocol.this
          .createCommitter(context);
    }
  }

  /**
   * Assert that for a given output, the job context returns a manifest
   * committer factory. This is what FileOutputFormat does internally,
   * and is needed to make sure that the relevant settings are being passed
   * around.
   * @param context job/task context
   * @param output destination path.
   */
  protected void assertCommitterFactoryIsManifestCommitter(
      JobContext context, Path output) {

    final Configuration conf = context.getConfiguration();
    // check one: committer
    assertConfigurationUsesManifestCommitter(conf);
    final String factoryName = conf.get(COMMITTER_FACTORY_CLASS, "");
    final PathOutputCommitterFactory factory
        = PathOutputCommitterFactory.getCommitterFactory(
        output,
        conf);
    Assertions.assertThat(factory)
        .describedAs("Committer for output path %s"
                + " and factory name \"%s\"",
            output, factoryName)
        .isInstanceOf(ManifestCommitterFactory.class);
  }

  /**
   * This is to debug situations where the test committer factory
   * on tasks was binding to FileOutputCommitter even when
   * tests were overriding it.
   * @param conf configuration to probe.
   */
  private void assertConfigurationUsesManifestCommitter(
      Configuration conf) {
    final String factoryName = conf.get(COMMITTER_FACTORY_CLASS, null);
    Assertions.assertThat(factoryName)
        .describedAs("Value of %s", COMMITTER_FACTORY_CLASS)
        .isEqualTo(MANIFEST_COMMITTER_FACTORY);
  }

  /**
   * Write some text out.
   * @param context task
   * @throws IOException IO failure
   * @throws InterruptedException write interrupted
   * @return the path written to
   */
  protected Path writeTextOutput(TaskAttemptContext context)
      throws IOException, InterruptedException {
    describe("write output");
    try (DurationInfo d = new DurationInfo(LOG,
        "Writing Text output for task %s", context.getTaskAttemptID())) {
      TextOutputForTests.LoggingLineRecordWriter<Writable, Object> writer
          = new TextOutputForTests<Writable, Object>().getRecordWriter(context);
      writeOutput(writer, context);
      return writer.getDest();
    }
  }

  /**
   * Write the standard output.
   * @param writer record writer
   * @param context task context
   * @throws IOException IO failure
   * @throws InterruptedException write interrupted
   */
  private void writeOutput(
      RecordWriter<Writable, Object> writer,
      TaskAttemptContext context) throws IOException, InterruptedException {
    NullWritable nullWritable = NullWritable.get();
    try (ManifestCommitterTestSupport.CloseWriter<Writable, Object> cw =
             new ManifestCommitterTestSupport.CloseWriter<>(writer, context)) {
      writer.write(KEY_1, VAL_1);
      writer.write(null, nullWritable);
      writer.write(null, VAL_1);
      writer.write(nullWritable, VAL_2);
      writer.write(KEY_2, nullWritable);
      writer.write(KEY_1, null);
      writer.write(null, null);
      writer.write(KEY_2, VAL_2);
      writer.close(context);
    }
  }

  /**
   * Write the output of a map.
   * @param writer record writer
   * @param context task context
   * @throws IOException IO failure
   * @throws InterruptedException write interrupted
   */
  private void writeMapFileOutput(RecordWriter<WritableComparable<?>, Writable> writer,
      TaskAttemptContext context) throws IOException, InterruptedException {
    describe("\nWrite map output");
    try (DurationInfo d = new DurationInfo(LOG,
        "Writing Text output for task %s", context.getTaskAttemptID());
         ManifestCommitterTestSupport.CloseWriter<WritableComparable<?>, Writable> cw =
             new ManifestCommitterTestSupport.CloseWriter<>(writer, context)) {
      for (int i = 0; i < 10; ++i) {
        Text val = ((i & 1) == 1) ? VAL_1 : VAL_2;
        writer.write(new LongWritable(i), val);
      }
      LOG.debug("Closing writer {}", writer);
      writer.close(context);
    }
  }

  /**
   * Details on a job for use in {@code startJob} and elsewhere.
   */
  protected static final class JobData {

    private final Job job;

    private final JobContext jContext;

    private final TaskAttemptContext tContext;

    private final ManifestCommitter committer;

    private final Configuration conf;

    private Path writtenTextPath; // null if not written to

    public JobData(Job job,
        JobContext jContext,
        TaskAttemptContext tContext,
        ManifestCommitter committer) {
      this.job = job;
      this.jContext = jContext;
      this.tContext = tContext;
      this.committer = committer;
      conf = job.getConfiguration();
    }

    public String jobId() {
      return committer.getJobUniqueId();
    }
  }

  /**
   * Create a new job. Sets the task attempt ID,
   * and output dir; asks for a success marker.
   * @return the new job
   * @throws IOException failure
   */
  public Job newJob() throws IOException {
    return newJob(outputDir, getConfiguration(), attempt0);
  }

  /**
   * Create a new job. Sets the task attempt ID,
   * and output dir; asks for a success marker.
   * Committer factory is set to manifest factory, so is independent
   * of FS schema.
   * @param dir dest dir
   * @param configuration config to get the job from
   * @param taskAttemptId task attempt
   * @return the new job
   * @throws IOException failure
   */
  private Job newJob(Path dir, Configuration configuration,
      String taskAttemptId) throws IOException {
    Job job = Job.getInstance(configuration);
    Configuration conf = job.getConfiguration();
    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId);
    enableManifestCommitter(conf);
    FileOutputFormat.setOutputPath(job, dir);
    return job;
  }

  /**
   * Start a job with a committer; optionally write the test data.
   * Always register the job to be aborted (quietly) in teardown.
   * This is, from an "OO-purity perspective" the wrong kind of method to
   * do: it's setting things up, mixing functionality, registering for teardown.
   * Its aim is simple though: a common body of code for starting work
   * in test cases.
   * @param writeText should the text be written?
   * @return the job data 4-tuple
   * @throws IOException IO problems
   * @throws InterruptedException interruption during write
   */
  protected JobData startJob(boolean writeText)
      throws IOException, InterruptedException {
    return startJob(localCommitterFactory, writeText);
  }

  /**
   * Start a job with a committer; optionally write the test data.
   * Always register the job to be aborted (quietly) in teardown.
   * This is, from an "OO-purity perspective" the wrong kind of method to
   * do: it's setting things up, mixing functionality, registering for teardown.
   * Its aim is simple though: a common body of code for starting work
   * in test cases.
   * @param factory the committer factory to use
   * @param writeText should the text be written?
   * @return the job data 4-tuple
   * @throws IOException IO problems
   * @throws InterruptedException interruption during write
   */
  protected JobData startJob(CommitterFactory factory, boolean writeText)
      throws IOException, InterruptedException {
    Job job = newJob();
    Configuration conf = job.getConfiguration();
    assertConfigurationUsesManifestCommitter(conf);
    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
    JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID());
    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
        taskAttempt0);
    ManifestCommitter committer = factory.createCommitter(tContext);

    // setup
    JobData jobData = new JobData(job, jContext, tContext, committer);
    setupJob(jobData);
    abortInTeardown(jobData);

    if (writeText) {
      // write output
      jobData.writtenTextPath = writeTextOutput(tContext);
    }
    return jobData;
  }

  /**
   * Set up the job and task.
   * @param jobData job data
   * @throws IOException problems
   */
  protected void setupJob(JobData jobData) throws IOException {
    ManifestCommitter committer = jobData.committer;
    JobContext jContext = jobData.jContext;
    TaskAttemptContext tContext = jobData.tContext;
    describe("\nsetup job");
    try (DurationInfo d = new DurationInfo(LOG,
        "setup job %s", jContext.getJobID())) {
      committer.setupJob(jContext);
    }
    setupCommitter(committer, tContext);
    describe("setup complete");
  }

  private void setupCommitter(
      final ManifestCommitter committer,
      final TaskAttemptContext tContext) throws IOException {
    try (DurationInfo d = new DurationInfo(LOG,
        "setup task %s", tContext.getTaskAttemptID())) {
      committer.setupTask(tContext);
    }
  }

  /**
   * Abort a job quietly.
   * @param jobData job info
   */
  protected void abortJobQuietly(JobData jobData) {
    abortJobQuietly(jobData.committer, jobData.jContext, jobData.tContext);
  }

  /**
   * Abort a job quietly: first task, then job.
   * @param committer committer
   * @param jContext job context
   * @param tContext task context
   */
  protected void abortJobQuietly(ManifestCommitter committer,
      JobContext jContext,
      TaskAttemptContext tContext) {
    describe("\naborting task");
    try {
      committer.abortTask(tContext);
    } catch (Exception e) {
      log().warn("Exception aborting task:", e);
    }
    describe("\naborting job");
    try {
      committer.abortJob(jContext, JobStatus.State.KILLED);
    } catch (Exception e) {
      log().warn("Exception aborting job", e);
    }
  }

  /**
   * Commit the task and then the job.
   * @param committer committer
   * @param jContext job context
   * @param tContext task context
   * @throws IOException problems
   */
  protected void commitTaskAndJob(ManifestCommitter committer,
      JobContext jContext,
      TaskAttemptContext tContext) throws IOException {
    try (DurationInfo d = new DurationInfo(LOG,
        "committing Job %s", jContext.getJobID())) {
      describe("\ncommitting task");
      committer.commitTask(tContext);
      describe("\ncommitting job");
      committer.commitJob(jContext);
      describe("commit complete\n");
    }
  }

  /**
   * Execute work as part of a test, after creating the job.
   * After the execution, {@link #abortJobQuietly(JobData)} is
   * called for abort/cleanup.
   * @param name name of work (for logging)
   * @param action action to execute
   * @throws Exception failure
   */
  protected void executeWork(String name, ActionToTest action)
      throws Exception {
    executeWork(name, startJob(false), action);
  }

  /**
   * Execute work as part of a test, against the created job.
   * After the execution, {@link #abortJobQuietly(JobData)} is
   * called for abort/cleanup.
   * @param name name of work (for logging)
   * @param jobData job info
   * @param action action to execute
   * @throws Exception failure
   */
  public void executeWork(String name,
      JobData jobData,
      ActionToTest action) throws Exception {
    try (DurationInfo d = new DurationInfo(LOG, "Executing %s", name)) {
      action.exec(jobData.job,
          jobData.jContext,
          jobData.tContext,
          jobData.committer);
    } finally {
      abortJobQuietly(jobData);
    }
  }

  /**
   * Load a manifest from the test FS.
   * @param path path
   * @return the manifest
   * @throws IOException failure to load
   */
  TaskManifest loadManifest(Path path) throws IOException {
    return TaskManifest.load(getFileSystem(), path);
  }

  /**
   * Verify that recovery doesn't work for these committers.
   */
  @Test
  @SuppressWarnings("deprecation")
  public void testRecoveryAndCleanup() throws Exception {
    describe("Test (unsupported) task recovery.");
    JobData jobData = startJob(true);
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;

    Assertions.assertThat(committer.getWorkPath())
        .as("null workPath in committer " + committer)
        .isNotNull();
    Assertions.assertThat(committer.getOutputPath())
        .as("null outputPath in committer " + committer)
        .isNotNull();

    // Commit the task.
    commitTask(committer, tContext);

    // load and log the manifest
    final TaskManifest manifest = loadManifest(
        committer.getTaskManifestPath(tContext));
    LOG.info("Manifest {}", manifest);

    Configuration conf2 = jobData.job.getConfiguration();
    conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
    conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
    JobContext jContext2 = new JobContextImpl(conf2, taskAttempt0.getJobID());
    TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2,
        taskAttempt0);
    ManifestCommitter committer2 = createCommitter(tContext2);
    committer2.setupJob(tContext2);

    Assertions.assertThat(committer2.isRecoverySupported())
        .as("recoverySupported in " + committer2)
        .isFalse();
    intercept(IOException.class, "recover",
        () -> committer2.recoverTask(tContext2));

    // at this point, task attempt 0 has failed to recover
    // it should be abortable though. This will be a no-op as it already
    // committed
    describe("aborting task attempt 2; expect nothing to clean up");
    committer2.abortTask(tContext2);
    describe("Aborting job 2; expect pending commits to be aborted");
    committer2.abortJob(jContext2, JobStatus.State.KILLED);
  }

  /**
   * Assert that the task attempt FS Doesn't have a task attempt
   * directory.
   * @param committer committer
   * @param context task context
   * @throws IOException IO failure.
   */
  protected void assertTaskAttemptPathDoesNotExist(
      ManifestCommitter committer, TaskAttemptContext context)
      throws IOException {
    Path attemptPath = committer.getTaskAttemptPath(context);
    ContractTestUtils.assertPathDoesNotExist(
        attemptPath.getFileSystem(context.getConfiguration()),
        "task attempt dir",
        attemptPath);
  }

  protected void assertJobAttemptPathDoesNotExist(
      ManifestCommitter committer, JobContext context)
      throws IOException {
    Path attemptPath = committer.getJobAttemptPath(context);
    ContractTestUtils.assertPathDoesNotExist(
        attemptPath.getFileSystem(context.getConfiguration()),
        "job attempt dir",
        attemptPath);
  }

  /**
   * Verify the output of the directory.
   * That includes the {@code part-m-00000-*}
   * file existence and contents, as well as optionally, the success marker.
   * @param dir directory to scan.
   * @param expectSuccessMarker check the success marker?
   * @param expectedJobId job ID, verified if non-empty and success data loaded
   * @throws Exception failure.
   * @return the success data
   */
  private ManifestSuccessData validateContent(Path dir,
      boolean expectSuccessMarker,
      String expectedJobId) throws Exception {
    lsR(getFileSystem(), dir, true);
    ManifestSuccessData successData;
    if (expectSuccessMarker) {
      successData = verifySuccessMarker(dir, expectedJobId);
    } else {
      successData = null;
    }
    Path expectedFile = getPart0000(dir);
    log().debug("Validating content in {}", expectedFile);
    StringBuilder expectedOutput = new StringBuilder();
    expectedOutput.append(KEY_1).append('\t').append(VAL_1).append("\n");
    expectedOutput.append(VAL_1).append("\n");
    expectedOutput.append(VAL_2).append("\n");
    expectedOutput.append(KEY_2).append("\n");
    expectedOutput.append(KEY_1).append("\n");
    expectedOutput.append(KEY_2).append('\t').append(VAL_2).append("\n");
    String output = readFile(expectedFile);
    Assertions.assertThat(output)
        .describedAs("Content of %s", expectedFile)
        .isEqualTo(expectedOutput.toString());
    return successData;
  }

  /**
   * Identify any path under the directory which begins with the
   * {@code "part-m-00000"} sequence. There's some compensation for
   * eventual consistency here.
   * @param dir directory to scan
   * @return the full path
   * @throws FileNotFoundException the path is missing.
   * @throws Exception failure.
   */
  protected Path getPart0000(final Path dir) throws Exception {
    final FileSystem fs = dir.getFileSystem(getConfiguration());
    FileStatus[] statuses = fs.listStatus(dir,
        path -> path.getName().startsWith(PART_00000));
    if (statuses.length != 1) {
      // fail, with a listing of the parent dir
      ContractTestUtils.assertPathExists(fs, "Output file",
          new Path(dir, PART_00000));
    }
    return statuses[0].getPath();
  }

  /**
   * Look for the partFile subdir of the output dir
   * and the ma and data entries.
   * @param fs filesystem
   * @param dir output dir
   * @throws Exception failure.
   */
  private void validateMapFileOutputContent(
      FileSystem fs, Path dir) throws Exception {
    // map output is a directory with index and data files
    assertPathExists("Map output", dir);
    Path expectedMapDir = getPart0000(dir);
    assertPathExists("Map output", expectedMapDir);
    assertIsDirectory(expectedMapDir);
    FileStatus[] files = fs.listStatus(expectedMapDir);
    Assertions.assertThat(files)
        .as("No files found in " + expectedMapDir)
        .isNotEmpty();
    assertPathExists("index file in " + expectedMapDir,
        new Path(expectedMapDir, MapFile.INDEX_FILE_NAME));
    assertPathExists("data file in " + expectedMapDir,
        new Path(expectedMapDir, MapFile.DATA_FILE_NAME));
  }

  /**
   * Full test of the expected lifecycle: start job, task, write, commit task,
   * commit job.
   * @throws Exception on a failure
   */
  @Test
  public void testCommitLifecycle() throws Exception {
    describe("Full test of the expected lifecycle:\n" +
        " start job, task, write, commit task, commit job.\n" +
        "Verify:\n" +
        "* no files are visible after task commit\n" +
        "* the expected file is visible after job commit\n");
    JobData jobData = startJob(false);
    JobContext jContext = jobData.jContext;
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;
    assertCommitterFactoryIsManifestCommitter(tContext,
        tContext.getWorkingDirectory());
    validateTaskAttemptWorkingDirectory(committer, tContext);

    // write output
    describe("1. Writing output");
    final Path textOutputPath = writeTextOutput(tContext);
    describe("Output written to %s", textOutputPath);

    describe("2. Committing task");
    Assertions.assertThat(committer.needsTaskCommit(tContext))
        .as("No files to commit were found by " + committer)
        .isTrue();
    commitTask(committer, tContext);
    final TaskManifest taskManifest = requireNonNull(
        committer.getTaskAttemptCommittedManifest(), "committerTaskManifest");
    final String manifestJSON = taskManifest.toJson();
    LOG.info("Task manifest {}", manifestJSON);
    int filesCreated = 1;
    Assertions.assertThat(taskManifest.getFilesToCommit())
        .describedAs("Files to commit in task manifest %s", manifestJSON)
        .hasSize(filesCreated);
    Assertions.assertThat(taskManifest.getDestDirectories())
        .describedAs("Directories to create in task manifest %s",
            manifestJSON)
        .isEmpty();

    // this is only task commit; there MUST be no part- files in the dest dir
    try {
      RemoteIterators.foreach(getFileSystem().listFiles(outputDir, false),
          (status) ->
              Assertions.assertThat(status.getPath().toString())
                  .as("task committed file to dest :" + status)
                  .contains("part"));
    } catch (FileNotFoundException ignored) {
      log().info("Outdir {} is not created by task commit phase ",
          outputDir);
    }

    describe("3. Committing job");

    commitJob(committer, jContext);

    // validate output
    describe("4. Validating content");
    String jobUniqueId = jobData.jobId();
    ManifestSuccessData successData = validateContent(outputDir,
        true,
        jobUniqueId);
    // look in the SUMMARY
    Assertions.assertThat(successData.getDiagnostics())
        .describedAs("Stage entry in SUCCESS")
        .containsEntry(STAGE, OP_STAGE_JOB_COMMIT);
    IOStatisticsSnapshot jobStats = successData.getIOStatistics();
    // manifest
    verifyStatisticCounterValue(jobStats,
        OP_LOAD_MANIFEST, 1);
    FileStatus st = getFileSystem().getFileStatus(getPart0000(outputDir));
    verifyStatisticCounterValue(jobStats,
        COMMITTER_FILES_COMMITTED_COUNT, filesCreated);
    verifyStatisticCounterValue(jobStats,
        COMMITTER_BYTES_COMMITTED_COUNT, st.getLen());

    // now load and examine the job report.
    // this MUST contain all the stats of the summary, plus timings on
    // job commit itself

    ManifestSuccessData report = loadReport(jobUniqueId, true);
    Map<String, String> diag = report.getDiagnostics();
    Assertions.assertThat(diag)
        .describedAs("Stage entry in report")
        .containsEntry(STAGE, OP_STAGE_JOB_COMMIT);
    IOStatisticsSnapshot reportStats = report.getIOStatistics();
    verifyStatisticCounterValue(reportStats,
        OP_LOAD_MANIFEST, 1);
    verifyStatisticCounterValue(reportStats,
        OP_STAGE_JOB_COMMIT, 1);
    verifyStatisticCounterValue(reportStats,
        COMMITTER_FILES_COMMITTED_COUNT, filesCreated);
    verifyStatisticCounterValue(reportStats,
        COMMITTER_BYTES_COMMITTED_COUNT, st.getLen());

  }

  /**
   * Load a summary from the report dir.
   * @param jobUniqueId job ID
   * @param expectSuccess is the job expected to have succeeded.
   * @throws IOException failure to load
   * @return the report
   */
  private ManifestSuccessData loadReport(String jobUniqueId,
      boolean expectSuccess) throws IOException {
    File file = new File(getReportDir(),
        createJobSummaryFilename(jobUniqueId));
    ContractTestUtils.assertIsFile(FileSystem.getLocal(getConfiguration()),
        new Path(file.toURI()));
    ManifestSuccessData report = ManifestSuccessData.serializer().load(file);
    LOG.info("Report for job {}:\n{}", jobUniqueId, report.toJson());
    Assertions.assertThat(report.getSuccess())
        .describedAs("success flag in report")
        .isEqualTo(expectSuccess);
    return report;
  }

  /**
   * Repeated commit call after job commit.
   */
  @Test
  public void testCommitterWithDuplicatedCommit() throws Exception {
    describe("Call a task then job commit twice;" +
        "expect the second task commit to fail.");
    JobData jobData = startJob(true);
    JobContext jContext = jobData.jContext;
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;

    // do commit
    describe("committing task");
    committer.commitTask(tContext);

    // repeated commit while TA dir exists fine/idempotent
    committer.commitTask(tContext);

    describe("committing job");
    committer.commitJob(jContext);
    describe("commit complete\n");

    describe("cleanup");
    committer.cleanupJob(jContext);
    // validate output
    validateContent(outputDir, shouldExpectSuccessMarker(),
        committer.getJobUniqueId());

    // commit task to fail on retry as task attempt dir doesn't exist
    describe("Attempting commit of the same task after job commit -expecting failure");
    expectFNFEonTaskCommit(committer, tContext);
  }

  /**
   * HADOOP-17258. If a second task attempt is committed, it
   * must succeed, and the output of the first TA, even if already
   * committed, MUST NOT be visible in the final output.
   * <p></p>
   * What's important is not just that only one TA must succeed,
   * but it must be the last one executed.
   */
  @Test
  public void testTwoTaskAttemptsCommit() throws Exception {
    describe("Commit two task attempts;" +
        " expect the second attempt to succeed.");
    JobData jobData = startJob(false);
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;
    // do commit
    describe("\ncommitting task");
    // write output for TA 1,
    Path outputTA1 = writeTextOutput(tContext);

    // speculatively execute committer 2.

    // jobconf with a different base to its parts.
    Configuration conf2 = jobData.conf;
    conf2.set("mapreduce.output.basename", "attempt2");
    String attempt2 = "attempt_" + jobId + "_m_000000_1";
    TaskAttemptID ta2 = TaskAttemptID.forName(attempt2);
    TaskAttemptContext tContext2 = new TaskAttemptContextImpl(
        conf2, ta2);

    ManifestCommitter committer2 = localCommitterFactory
        .createCommitter(tContext2);
    setupCommitter(committer2, tContext2);

    // verify working dirs are different
    Assertions.assertThat(committer.getWorkPath())
        .describedAs("Working dir of %s", committer)
        .isNotEqualTo(committer2.getWorkPath());

    // write output for TA 2,
    Path outputTA2 = writeTextOutput(tContext2);

    // verify the names are different.
    String name1 = outputTA1.getName();
    String name2 = outputTA2.getName();
    Assertions.assertThat(name1)
        .describedAs("name of task attempt output %s", outputTA1)
        .isNotEqualTo(name2);

    // commit task 1
    committer.commitTask(tContext);

    // then pretend that task1 didn't respond, so
    // commit task 2
    committer2.commitTask(tContext2);

    // and the job
    committer2.commitJob(tContext);

    // validate output
    FileSystem fs = getFileSystem();
    ManifestSuccessData successData = validateSuccessFile(fs, outputDir,
        1,
        "");
    Assertions.assertThat(successData.getFilenames())
        .describedAs("Files committed")
        .hasSize(1);

    assertPathExists("attempt2 output", new Path(outputDir, name2));
    assertPathDoesNotExist("attempt1 output", new Path(outputDir, name1));

  }

  protected boolean shouldExpectSuccessMarker() {
    return true;
  }

  /**
   * Simulate a failure on the first job commit; expect the
   * second to succeed.
   */
  /*@Test
  public void testCommitterWithFailure() throws Exception {
    describe("Fail the first job commit then retry");
    JobData jobData = startJob(new FailingCommitterFactory(), true);
    JobContext jContext = jobData.jContext;
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;

    // do commit
    committer.commitTask(tContext);

    // now fail job
    expectSimulatedFailureOnJobCommit(jContext, committer);

    commitJob(committer, jContext);

    // but the data got there, due to the order of operations.
    validateContent(outDir, shouldExpectSuccessMarker(),
        committer.getUUID());
    expectJobCommitToFail(jContext, committer);
  }
*/

  /**
   * Override point: the failure expected on the attempt to commit a failed
   * job.
   * @param jContext job context
   * @param committer committer
   * @throws Exception any unexpected failure.
   */
  protected void expectJobCommitToFail(JobContext jContext,
      ManifestCommitter committer) throws Exception {
    // next attempt will fail as there is no longer a directory to commit
    expectJobCommitFailure(jContext, committer,
        FileNotFoundException.class);
  }

  /**
   * Expect a job commit operation to fail with a specific exception.
   * @param jContext job context
   * @param committer committer
   * @param clazz class of exception
   * @return the caught exception
   * @throws Exception any unexpected failure.
   */
  protected static <E extends IOException> E expectJobCommitFailure(
      JobContext jContext,
      ManifestCommitter committer,
      Class<E> clazz)
      throws Exception {

    return intercept(clazz,
        () -> {
          committer.commitJob(jContext);
          return committer.toString();
        });
  }

  protected static void expectFNFEonTaskCommit(
      ManifestCommitter committer,
      TaskAttemptContext tContext) throws Exception {
    intercept(FileNotFoundException.class,
        () -> {
          committer.commitTask(tContext);
          return committer.toString();
        });
  }

  /**
   * Commit a task with no output.
   * Dest dir should exist.
   */
  @Test
  public void testCommitterWithNoOutputs() throws Exception {
    describe("Have a task and job with no outputs: expect success");
    JobData jobData = startJob(localCommitterFactory, false);
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;

    // do commit
    committer.commitTask(tContext);
    Path attemptPath = committer.getTaskAttemptPath(tContext);
    ContractTestUtils.assertPathExists(
        attemptPath.getFileSystem(tContext.getConfiguration()),
        "task attempt dir",
        attemptPath);
  }


  @Test
  public void testMapFileOutputCommitter() throws Exception {
    describe("Test that the committer generates map output into a directory\n" +
        "starting with the prefix part-");
    JobData jobData = startJob(false);
    JobContext jContext = jobData.jContext;
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;
    Configuration conf = jobData.conf;

    // write output
    writeMapFileOutput(new MapFileOutputFormat()
            .getRecordWriter(tContext), tContext);

    // do commit
    commitTaskAndJob(committer, jContext, tContext);
    FileSystem fs = getFileSystem();

    lsR(fs, outputDir, true);
    String ls = ls(outputDir);
    describe("\nvalidating");

    // validate output
    verifySuccessMarker(outputDir, committer.getJobUniqueId());

    describe("validate output of %s", outputDir);
    validateMapFileOutputContent(fs, outputDir);

    // Ensure getReaders call works and also ignores
    // hidden filenames (_ or . prefixes)
    describe("listing");
    FileStatus[] filtered = fs.listStatus(outputDir, HIDDEN_FILE_FILTER);
    Assertions.assertThat(filtered)
        .describedAs("listed children under %s", ls)
        .hasSize(1);
    FileStatus fileStatus = filtered[0];
    Assertions.assertThat(fileStatus.getPath().getName())
        .as("Not the part file: " + fileStatus)
        .startsWith(PART_00000);

    describe("getReaders()");
    Assertions.assertThat(getReaders(fs, outputDir, conf))
        .describedAs("getReaders() MapFile.Reader entries with shared FS %s %s", outputDir, ls)
        .hasSize(1);

    describe("getReaders(new FS)");
    FileSystem fs2 = FileSystem.get(outputDir.toUri(), conf);
    Assertions.assertThat(getReaders(fs2, outputDir, conf))
        .describedAs("getReaders(new FS) %s %s", outputDir, ls)
        .hasSize(1);

    describe("MapFileOutputFormat.getReaders");
    Assertions.assertThat(MapFileOutputFormat.getReaders(outputDir, conf))
        .describedAs("MapFileOutputFormat.getReaders(%s) %s", outputDir, ls)
        .hasSize(1);

  }

  /** Open the output generated by this format. */
  @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
  private static MapFile.Reader[] getReaders(FileSystem fs,
      Path dir,
      Configuration conf) throws IOException {
    Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, HIDDEN_FILE_FILTER));

    // sort names, so that hash partitioning works
    Arrays.sort(names);

    MapFile.Reader[] parts = new MapFile.Reader[names.length];
    for (int i = 0; i < names.length; i++) {
      parts[i] = new MapFile.Reader(names[i], conf);
    }
    return parts;
  }

  public static final PathFilter HIDDEN_FILE_FILTER = (path) ->
      !path.getName().startsWith("_") && !path.getName().startsWith(".");

  /**
   * A functional interface which an action to test must implement.
   */
  @FunctionalInterface
  public interface ActionToTest {

    void exec(Job job, JobContext jContext, TaskAttemptContext tContext,
        ManifestCommitter committer) throws Exception;
  }

  @Test
  public void testAbortTaskNoWorkDone() throws Exception {
    executeWork("abort task no work",
        (job, jContext, tContext, committer) ->
            committer.abortTask(tContext));
  }

  @Test
  public void testAbortJobNoWorkDone() throws Exception {
    executeWork("abort task no work",
        (job, jContext, tContext, committer) ->
            committer.abortJob(jContext, JobStatus.State.RUNNING));
  }

  @Test
  public void testCommitJobButNotTask() throws Exception {
    executeWork("commit a job while a task's work is pending, " +
            "expect task writes to be cancelled.",
        (job, jContext, tContext, committer) -> {
          // step 1: write the text
          writeTextOutput(tContext);
          // step 2: commit the job
          createCommitter(tContext).commitJob(tContext);
          // verify that no output can be observed
          assertPart0000DoesNotExist(outputDir);
        }
    );
  }

  @Test
  public void testAbortTaskThenJob() throws Exception {
    JobData jobData = startJob(true);
    ManifestCommitter committer = jobData.committer;

    // do abort
    committer.abortTask(jobData.tContext);

    intercept(FileNotFoundException.class, "",
        () -> getPart0000(committer.getWorkPath()));

    committer.abortJob(jobData.jContext, JobStatus.State.FAILED);
    assertJobAbortCleanedUp(jobData);

  }

  /**
   * Extension point: assert that the job was all cleaned up after an abort.
   * Base assertions
   * <ul>
   *   <li>Output dir is absent or, if present, empty</li>
   * </ul>
   * @param jobData job data
   * @throws Exception failure
   */
  public void assertJobAbortCleanedUp(JobData jobData) throws Exception {
    FileSystem fs = getFileSystem();
    try {
      FileStatus[] children = listChildren(fs, outputDir);
      if (children.length != 0) {
        lsR(fs, outputDir, true);
      }
      Assertions.assertThat(children)
          .as("Output directory not empty " + ls(outputDir))
          .containsExactly(new FileStatus[0]);
    } catch (FileNotFoundException e) {
      // this is a valid state; it means the dest dir doesn't exist yet.
    }

  }

  @Test
  public void testFailAbort() throws Exception {
    describe("Abort the task, then job (failed), abort the job again");
    JobData jobData = startJob(true);
    JobContext jContext = jobData.jContext;
    TaskAttemptContext tContext = jobData.tContext;
    ManifestCommitter committer = jobData.committer;

    // do abort
    committer.abortTask(tContext);

    committer.getJobAttemptPath(jContext);
    committer.getTaskAttemptPath(tContext);
    assertPart0000DoesNotExist(outputDir);
    assertSuccessMarkerDoesNotExist(outputDir);
    describe("Aborting job into %s", outputDir);

    committer.abortJob(jContext, JobStatus.State.FAILED);

    assertTaskAttemptPathDoesNotExist(committer, tContext);
    assertJobAttemptPathDoesNotExist(committer, jContext);

    // verify a failure report
    ManifestSuccessData report = loadReport(jobData.jobId(), false);
    Map<String, String> diag = report.getDiagnostics();
    Assertions.assertThat(diag)
        .describedAs("Stage entry in report")
        .containsEntry(STAGE, OP_STAGE_JOB_ABORT);
    IOStatisticsSnapshot reportStats = report.getIOStatistics();
    verifyStatisticCounterValue(reportStats,
        OP_STAGE_JOB_ABORT, 1);

    // try again; expect abort to be idempotent.
    committer.abortJob(jContext, JobStatus.State.FAILED);

  }

  /**
   * Assert that the given dir does not have the {@code _SUCCESS} marker.
   * @param dir dir to scan
   * @throws IOException IO Failure
   */
  protected void assertSuccessMarkerDoesNotExist(Path dir) throws IOException {
    assertPathDoesNotExist("Success marker",
        new Path(dir, SUCCESS_MARKER));
  }

  public void assertPart0000DoesNotExist(Path dir) throws Exception {
    intercept(FileNotFoundException.class,
        () -> getPart0000(dir));
    assertPathDoesNotExist("expected output file", new Path(dir, PART_00000));
  }

  @Test
  public void testAbortJobNotTask() throws Exception {
    executeWork("abort task no work",
        (job, jContext, tContext, committer) -> {
          // write output
          writeTextOutput(tContext);
          committer.abortJob(jContext, JobStatus.State.RUNNING);
          assertTaskAttemptPathDoesNotExist(
              committer, tContext);
          assertJobAttemptPathDoesNotExist(
              committer, jContext);
        });
  }

  /**
   * This looks at what happens with concurrent commits.
   * However, the failure condition it looks for (subdir under subdir)
   * is the kind of failure you see on a rename-based commit.
   *
   * What it will not detect is the fact that both tasks will each commit
   * to the destination directory. That is: whichever commits last wins.
   *
   * There's no way to stop this. Instead it is a requirement that the task
   * commit operation is only executed when the committer is happy to
   * commit only those tasks which it knows have succeeded, and abort those
   * which have not.
   * @throws Exception failure
   */
  @Test
  public void testConcurrentCommitTaskWithSubDir() throws Exception {
    Job job = newJob();
    FileOutputFormat.setOutputPath(job, outputDir);
    final Configuration conf = job.getConfiguration();

    final JobContext jContext =
        new JobContextImpl(conf, taskAttempt0.getJobID());
    ManifestCommitter amCommitter = createCommitter(
        new TaskAttemptContextImpl(conf, taskAttempt0));
    amCommitter.setupJob(jContext);

    final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
    taCtx[0] = new TaskAttemptContextImpl(conf, taskAttempt0);
    taCtx[1] = new TaskAttemptContextImpl(conf, taskAttempt1);

    // IDE/checkstyle complain here about type casting but they
    // are confused.
    final TextOutputFormat<Writable, Object>[] tof =
        new TextOutputForTests[2];

    for (int i = 0; i < tof.length; i++) {
      tof[i] = new TextOutputForTests<Writable, Object>() {
        @Override
        public Path getDefaultWorkFile(
            TaskAttemptContext context,
            String extension) throws IOException {
          final ManifestCommitter foc = (ManifestCommitter)
              getOutputCommitter(context);
          return new Path(new Path(foc.getWorkPath(), SUB_DIR),
              getUniqueFile(context, getOutputName(context), extension));
        }
      };
    }

    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
    try {
      for (int i = 0; i < taCtx.length; i++) {
        final int taskIdx = i;
        executor.submit(() -> {
          final OutputCommitter outputCommitter =
              tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
          outputCommitter.setupTask(taCtx[taskIdx]);
          writeOutput(tof[taskIdx].getRecordWriter(taCtx[taskIdx]), taCtx[taskIdx]);
          describe("Committing Task %d", taskIdx);
          outputCommitter.commitTask(taCtx[taskIdx]);
          return null;
        });
      }
    } finally {
      executor.shutdown();
      while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
        log().info("Awaiting thread termination!");
      }
    }

    // if we commit here then all tasks will be committed, so there will
    // be contention for that final directory: both parts will go in.

    describe("\nCommitting Job");
    amCommitter.commitJob(jContext);
    assertPathExists("base output directory", outputDir);
    assertPart0000DoesNotExist(outputDir);
    Path outSubDir = new Path(outputDir, SUB_DIR);
    assertPathDoesNotExist("Must not end up with sub_dir/sub_dir",
        new Path(outSubDir, SUB_DIR));

    // validate output
    // There's no success marker in the subdirectory
    validateContent(outSubDir, false, "");
  }

  @Test
  public void testUnsupportedSchema() throws Throwable {
    intercept(PathIOException.class, () ->
        new ManifestCommitterFactory()
            .createOutputCommitter(new Path("s3a://unsupported/"), null));
  }

  /**
   * Factory for failing committers.
   */


/*
  protected ManifestCommitter createFailingCommitter(
  final TaskAttemptContext tContext)
      throws IOException {
    //     TODO
    return null;
  }

  public class FailingCommitterFactory implements CommitterFactory {

    @Override
    public ManifestCommitter createCommitter(TaskAttemptContext context)
        throws IOException {
      return createFailingCommitter(context);
    }
  }*/
  @Test
  public void testOutputFormatIntegration() throws Throwable {
    Configuration conf = getConfiguration();
    Job job = newJob();
    assertCommitterFactoryIsManifestCommitter(job, outputDir);
    job.setOutputFormatClass(TextOutputForTests.class);
    conf = job.getConfiguration();
    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
    JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID());
    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
        taskAttempt0);
    TextOutputForTests<IntWritable, IntWritable> outputFormat =
        (TextOutputForTests<IntWritable, IntWritable>)
            ReflectionUtils.newInstance(tContext.getOutputFormatClass(), conf);
    ManifestCommitter committer = (ManifestCommitter)
        outputFormat.getOutputCommitter(tContext);

    // check path capabilities directly
    Assertions.assertThat(committer.hasCapability(
            ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
        .describedAs("dynamic partitioning capability in committer %s",
            committer)
        .isTrue();
    // and through a binding committer -passthrough is critical
    // for the spark binding.
    BindingPathOutputCommitter bindingCommitter =
        new BindingPathOutputCommitter(outputDir, tContext);
    Assertions.assertThat(bindingCommitter.hasCapability(
            ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
        .describedAs("dynamic partitioning capability in committer %s",
            bindingCommitter)
        .isTrue();


    // setup
    JobData jobData = new JobData(job, jContext, tContext, committer);
    setupJob(jobData);
    abortInTeardown(jobData);
    TextOutputForTests.LoggingLineRecordWriter<IntWritable, IntWritable> recordWriter
        = outputFormat.getRecordWriter(tContext);
    IntWritable iw = new IntWritable(1);
    recordWriter.write(iw, iw);
    long expectedLength = 4;
    Path dest = recordWriter.getDest();
    validateTaskAttemptPathDuringWrite(dest, expectedLength);
    recordWriter.close(tContext);
    // at this point
    validateTaskAttemptPathAfterWrite(dest, expectedLength);
    Assertions.assertThat(committer.needsTaskCommit(tContext))
        .as("Committer does not have data to commit " + committer)
        .isTrue();
    commitTask(committer, tContext);
    // at this point the committer tasks stats should be current.
    IOStatisticsSnapshot snapshot = new IOStatisticsSnapshot(
        committer.getIOStatistics());
    String commitsCompleted = COMMITTER_TASKS_COMPLETED_COUNT;
    LOG.info("after task commit {}", ioStatisticsToPrettyString(snapshot));
    verifyStatisticCounterValue(snapshot,
        commitsCompleted, 1);
    final TaskManifest manifest = loadManifest(
        committer.getTaskManifestPath(tContext));
    LOG.info("Manifest {}", manifest.toJson());

    commitJob(committer, jContext);
    LOG.info("committer iostatistics {}",
        ioStatisticsSourceToString(committer));

    // validate output
    ManifestSuccessData successData = verifySuccessMarker(outputDir,
        committer.getJobUniqueId());

    // the task commit count should get through the job commit
    IOStatisticsSnapshot successStats = successData.getIOStatistics();
    LOG.info("loaded statistics {}", successStats);
    verifyStatisticCounterValue(successStats,
        commitsCompleted, 1);
  }

  /**
   * Create a committer through reflection then use it to abort
   * a task. This mimics the action of an AM when a container fails and
   * the AM wants to abort the task attempt.
   */
  @Test
  public void testAMWorkflow() throws Throwable {
    describe("Create a committer with a null output path & use as an AM");
    JobData jobData = startJob(true);
    JobContext jContext = jobData.jContext;
    TaskAttemptContext tContext = jobData.tContext;

    TaskAttemptContext newAttempt = new TaskAttemptContextImpl(
        jContext.getConfiguration(),
        taskAttempt0);
    Configuration conf = jContext.getConfiguration();

    // bind
    TextOutputForTests.bind(conf);

    OutputFormat<?, ?> outputFormat
        = ReflectionUtils.newInstance(newAttempt.getOutputFormatClass(), conf);
    Path outputPath = FileOutputFormat.getOutputPath(newAttempt);
    Assertions.assertThat(outputPath)
        .as("null output path in new task attempt")
        .isNotNull();

    ManifestCommitter committer2 = (ManifestCommitter)
        outputFormat.getOutputCommitter(newAttempt);
    committer2.abortTask(tContext);

  }

  /**
   * Make sure that two jobs in parallel directory trees coexist.
   * Note: the two jobs are not trying to write to the same
   * output directory.
   * That should be possible, but cleanup must be disabled.
   */
  @Test
  public void testParallelJobsToAdjacentPaths() throws Throwable {

    describe("Run two jobs in parallel, assert they both complete");
    JobData jobData = startJob(true);
    Job job1 = jobData.job;
    ManifestCommitter committer1 = jobData.committer;
    JobContext jContext1 = jobData.jContext;
    TaskAttemptContext tContext1 = jobData.tContext;

    // now build up a second job
    String jobId2 = randomJobId();
    String attempt20 = "attempt_" + jobId2 + "_m_000000_0";
    TaskAttemptID taskAttempt20 = TaskAttemptID.forName(attempt20);
    String attempt21 = "attempt_" + jobId2 + "_m_000001_0";
    TaskAttemptID taskAttempt21 = TaskAttemptID.forName(attempt21);

    Path job1Dest = outputDir;
    Path job2Dest = new Path(getOutputDir().getParent(),
        getMethodName() + "job2Dest");
    // little safety check
    Assertions.assertThat(job2Dest)
        .describedAs("Job destinations")
        .isNotEqualTo(job1Dest);

    // create the second job
    Job job2 = newJob(job2Dest,
        unsetUUIDOptions(new JobConf(getConfiguration())),
        attempt20);
    Configuration conf2 = job2.getConfiguration();
    conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
    ManifestCommitter committer2 = null;
    try {
      JobContext jContext2 = new JobContextImpl(conf2,
          taskAttempt20.getJobID());
      TaskAttemptContext tContext2 =
          new TaskAttemptContextImpl(conf2, taskAttempt20);
      committer2 = createCommitter(job2Dest, tContext2);
      JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
      setupJob(jobData2);
      abortInTeardown(jobData2);
      // make sure the directories are different
      Assertions.assertThat(committer1.getOutputPath())
          .describedAs("Committer output path of %s and %s", committer1, committer2)
          .isNotEqualTo(committer2.getOutputPath());
      // and job IDs
      Assertions.assertThat(committer1.getJobUniqueId())
          .describedAs("JobUnique IDs of %s and %s", committer1, committer2)
          .isNotEqualTo(committer2.getJobUniqueId());

      // job2 setup, write some data there
      writeTextOutput(tContext2);

      // at this point, job1 and job2 both have uncommitted tasks

      // commit tasks in order task 2, task 1.
      commitTask(committer2, tContext2);
      commitTask(committer1, tContext1);

      // commit jobs in order job 1, job 2
      commitJob(committer1, jContext1);

      getPart0000(job1Dest);

      commitJob(committer2, jContext2);
      getPart0000(job2Dest);

    } finally {
      // clean things up in test failures.
      FileSystem fs = getFileSystem();
      if (committer1 != null) {
        fs.delete(committer1.getOutputPath(), true);
      }
      if (committer2 != null) {
        fs.delete(committer2.getOutputPath(), true);
      }
    }

  }

  /**
   * Strip staging/spark UUID options.
   * @param conf config
   * @return the patched config
   */
  protected Configuration unsetUUIDOptions(final Configuration conf) {
    conf.unset(SPARK_WRITE_UUID);
    return conf;
  }

  /**
   * Assert that a committer's job attempt path exists.
   * For the staging committers, this is in the cluster FS.
   * @param committer committer
   * @param jobContext job context
   * @throws IOException failure
   */
  protected void assertJobAttemptPathExists(
      final ManifestCommitter committer,
      final JobContext jobContext) throws IOException {
    Path attemptPath = committer.getJobAttemptPath(jobContext);
    ContractTestUtils.assertIsDirectory(
        attemptPath.getFileSystem(committer.getConf()),
        attemptPath);
  }

  /**
   * Validate the path of a file being written to during the write
   * itself.
   * @param p path
   * @param expectedLength
   * @throws IOException IO failure
   */
  protected void validateTaskAttemptPathDuringWrite(Path p,
      final long expectedLength) throws IOException {

  }

  /**
   * Validate the path of a file being written to after the write
   * operation has completed.
   * @param p path
   * @param expectedLength
   * @throws IOException IO failure
   */
  protected void validateTaskAttemptPathAfterWrite(Path p,
      final long expectedLength) throws IOException {

  }

  /**
   * Perform any actions needed to validate the working directory of
   * a committer.
   * For example: filesystem, path attributes
   * @param committer committer instance
   * @param context task attempt context
   * @throws IOException IO failure
   */
  protected void validateTaskAttemptWorkingDirectory(
      ManifestCommitter committer,
      TaskAttemptContext context) throws IOException {
  }

  /**
   * Commit a task then validate the state of the committer afterwards.
   * @param committer committer
   * @param tContext task context
   * @throws IOException IO failure
   */
  protected void commitTask(final ManifestCommitter committer,
      final TaskAttemptContext tContext) throws IOException {
    committer.commitTask(tContext);
  }

  /**
   * Commit a job then validate the state of the committer afterwards.
   * @param committer committer
   * @param jContext job context
   * @throws IOException IO failure
   */
  protected void commitJob(final ManifestCommitter committer,
      final JobContext jContext) throws IOException {
    committer.commitJob(jContext);

  }

}