JobSuite.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs.commit.mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.commit.BaseJobSuite;
import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
import org.apache.hadoop.fs.tosfs.commit.SuccessData;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.net.NetUtils;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public final class JobSuite extends BaseJobSuite {
  private final JobContext jobContext;
  private final TaskAttemptContext taskAttemptContext;
  private final Committer committer;

  private JobSuite(FileSystem fs, JobConf conf,
                   TaskAttemptID taskAttemptId, int appAttemptId, Path outputPath)
      throws IOException {
    setFs(fs);
    // Initialize the job instance.
    setJob(Job.getInstance(conf));
    job().setJobID(JobID.forName(CommitUtils.buildJobId(conf, taskAttemptId.getJobID())));
    this.jobContext = createJobContext(conf, taskAttemptId);
    this.taskAttemptContext = createTaskAttemptContext(conf, taskAttemptId, appAttemptId);
    setJobId(CommitUtils.buildJobId(jobContext));

    // Set job output directory.
    FileOutputFormat.setOutputPath(conf, outputPath);
    setOutputPath(outputPath);
    setObjectStorage(ObjectStorageFactory.create(outputPath.toUri().getScheme(),
        outputPath.toUri().getAuthority(), conf));

    // Initialize committer.
    this.committer = new Committer();
    this.committer.setupTask(taskAttemptContext);
  }

  public static JobSuite create(Configuration conf, TaskAttemptID taskAttemptId, Path outDir)
      throws IOException {
    FileSystem fs = outDir.getFileSystem(conf);
    return new JobSuite(fs, new JobConf(conf), taskAttemptId, DEFAULT_APP_ATTEMPT_ID, outDir);
  }

  public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int attemptId) {
    String attempt = String.format("attempt_%s_m_000000_%d", trimmedJobId, attemptId);
    return TaskAttemptID.forName(attempt);
  }

  public static JobContext createJobContext(JobConf jobConf, TaskAttemptID taskAttemptId) {
    return new JobContextImpl(jobConf, taskAttemptId.getJobID());
  }

  public static TaskAttemptContext createTaskAttemptContext(
      JobConf jobConf, TaskAttemptID taskAttemptId, int appAttemptId) throws IOException {
    // Set the key values for job configuration.
    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString());
    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
    jobConf.set("mapred.output.committer.class",
        Committer.class.getName()); // 2x and 3x newApiCommitter=false.
    return new TaskAttemptContextImpl(jobConf, taskAttemptId);
  }

  public void setupJob() throws IOException {
    committer.setupJob(jobContext);
  }

  public void setupTask() throws IOException {
    committer.setupTask(taskAttemptContext);
  }

  // This method simulates the scenario that the job may set up task with a different
  // taskAttemptContext, e.g., for a spark job.
  public void setupTask(TaskAttemptContext taskAttemptCxt) throws IOException {
    committer.setupTask(taskAttemptCxt);
  }

  public void writeOutput() throws Exception {
    writeOutput(taskAttemptContext);
  }

  // This method simulates the scenario that the job may set up task with a different
  // taskAttemptContext, e.g., for a spark job.
  public void writeOutput(TaskAttemptContext taskAttemptCxt) throws Exception {
    RecordWriter<Object, Object> writer = new TextOutputFormat<>().getRecordWriter(fs(),
        taskAttemptCxt.getJobConf(),
        CommitUtils.buildJobId(taskAttemptCxt),
        taskAttemptCxt.getProgressible());
    NullWritable nullKey = NullWritable.get();
    NullWritable nullVal = NullWritable.get();
    Object[] keys = new Object[]{KEY_1, nullKey, null, nullKey, null, KEY_2};
    Object[] vals = new Object[]{VAL_1, nullVal, null, null, nullVal, VAL_2};
    try {
      assertEquals(keys.length, vals.length);
      for (int i = 0; i < keys.length; i++) {
        writer.write(keys[i], vals[i]);
      }
    } finally {
      writer.close(Reporter.NULL);
    }
  }

  public boolean needsTaskCommit() throws IOException {
    return committer.needsTaskCommit(taskAttemptContext);
  }

  public void commitTask() throws IOException {
    committer.commitTask(taskAttemptContext);
  }

  // This method simulates the scenario that the job may set up task with a different
  // taskAttemptContext, e.g., for a spark job.
  public void commitTask(TaskAttemptContext taskAttemptCxt) throws IOException {
    committer.commitTask(taskAttemptCxt);
  }

  public void abortTask() throws IOException {
    committer.abortTask(taskAttemptContext);
  }

  public void commitJob() throws IOException {
    committer.commitJob(jobContext);
  }

  @Override
  public Path magicPartPath() {
    return new Path(committer.getWorkPath(), committer.jobId());
  }

  @Override
  public Path magicPendingSetPath() {
    return CommitUtils.magicTaskPendingSetPath(taskAttemptContext, outputPath());
  }

  public TaskAttemptContext taskAttemptContext() {
    return taskAttemptContext;
  }

  public Committer committer() {
    return committer;
  }

  @Override
  public void assertNoTaskAttemptPath() throws IOException {
    Path path = CommitUtils.magicTaskAttemptBasePath(taskAttemptContext, outputPath());
    assertFalse(fs().exists(path), "Task attempt path should be not existing");
    String pathToKey = ObjectUtils.pathToKey(path);
    assertNull(storage().head(pathToKey), "Should have no task attempt path key");
  }

  @Override
  protected boolean skipTests() {
    return storage().bucket().isDirectory();
  }

  @Override
  public void assertSuccessMarker() throws IOException {
    Path succPath = CommitUtils.successMarker(outputPath());
    assertTrue(fs().exists(succPath), String.format("%s should be exists", succPath));
    SuccessData successData = SuccessData.deserialize(CommitUtils.load(fs(), succPath));
    assertEquals(SuccessData.class.getName(), successData.name());
    assertTrue(successData.success());
    assertEquals(NetUtils.getHostname(), successData.hostname());
    assertEquals(CommitUtils.COMMITTER_NAME, successData.committer());
    assertEquals(
        String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()),
        successData.description());
    assertEquals(job().getJobID().toString(), successData.jobId());
    assertEquals(1, successData.filenames().size());
    assertEquals(destPartKey(), successData.filenames().get(0));
  }

  @Override
  public void assertSummaryReport(Path reportDir) throws IOException {
    Path reportPath = CommitUtils.summaryReport(reportDir, job().getJobID().toString());
    assertTrue(fs().exists(reportPath), String.format("%s should be exists", reportPath));
    SuccessData reportData = SuccessData.deserialize(CommitUtils.load(fs(), reportPath));
    assertEquals(SuccessData.class.getName(), reportData.name());
    assertTrue(reportData.success());
    assertEquals(NetUtils.getHostname(), reportData.hostname());
    assertEquals(CommitUtils.COMMITTER_NAME, reportData.committer());
    assertEquals(String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()),
        reportData.description());
    assertEquals(job().getJobID().toString(), reportData.jobId());
    assertEquals(1, reportData.filenames().size());
    assertEquals(destPartKey(), reportData.filenames().get(0));
    assertEquals("clean", reportData.diagnostics().get("stage"));
  }
}