CommitterTestBase.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.TestEnv;
import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
import org.apache.hadoop.fs.tosfs.commit.Pending;
import org.apache.hadoop.fs.tosfs.commit.PendingSet;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.util.CommonUtils;
import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class CommitterTestBase {
private Configuration conf;
private FileSystem fs;
private Path outputPath;
private TaskAttemptID taskAttempt0;
private Path reportDir;
@BeforeEach
public void setup() throws IOException {
conf = newConf();
fs = FileSystem.get(conf);
String uuid = UUIDUtils.random();
outputPath = fs.makeQualified(new Path("/test/" + uuid));
taskAttempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 0);
reportDir = fs.makeQualified(new Path("/report/" + uuid));
fs.mkdirs(reportDir);
conf.set(org.apache.hadoop.fs.tosfs.commit.Committer.COMMITTER_SUMMARY_REPORT_DIR,
reportDir.toUri().toString());
}
protected abstract Configuration newConf();
@AfterEach
public void teardown() {
CommonUtils.runQuietly(() -> fs.delete(outputPath, true));
IOUtils.closeStream(fs);
}
@BeforeAll
public static void beforeClass() {
Assumptions.assumeTrue(TestEnv.checkTestEnabled());
}
@AfterAll
public static void afterClass() {
if (!TestEnv.checkTestEnabled()) {
return;
}
List<String> committerThreads = Thread.getAllStackTraces().keySet()
.stream()
.map(Thread::getName)
.filter(n -> n.startsWith(org.apache.hadoop.fs.tosfs.commit.Committer.THREADS_PREFIX))
.collect(Collectors.toList());
assertTrue(committerThreads.isEmpty(), "Outstanding committer threads");
}
private static String randomTrimmedJobId() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
return String.format("%s%04d_%04d", formatter.format(new Date()),
(long) (Math.random() * 1000),
(long) (Math.random() * 1000));
}
private static String randomFormedJobId() {
return String.format("job_%s", randomTrimmedJobId());
}
@Test
public void testSetupJob() throws IOException {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
// Setup job.
suite.setupJob();
suite.dumpObjectStorage();
suite.assertHasMagicKeys();
}
@Test
public void testSetupJobWithOrphanPaths() throws IOException, InterruptedException {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
// Orphan success marker.
Path successPath = CommitUtils.successMarker(outputPath);
CommitUtils.save(fs, successPath, new byte[]{});
assertTrue(fs.exists(successPath), "The success file should exist.");
// Orphan job path.
Path jobPath = CommitUtils.magicJobPath(suite.committer().jobId(), outputPath);
fs.mkdirs(jobPath);
assertTrue(fs.exists(jobPath), "The job path should exist.");
Path subPath = new Path(jobPath, "tmp.pending");
CommitUtils.save(fs, subPath, new byte[]{});
assertTrue(fs.exists(subPath), "The sub path under job path should be existing.");
FileStatus jobPathStatus = fs.getFileStatus(jobPath);
Thread.sleep(1000L);
suite.setupJob();
suite.dumpObjectStorage();
suite.assertHasMagicKeys();
assertFalse(fs.exists(successPath), "Should have deleted the success path");
assertTrue(fs.exists(jobPath), "Should have re-created the job path");
assertFalse(fs.exists(subPath), "Should have deleted the sub path under the job path");
}
@Test
public void testSetupTask() throws IOException {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
// Remaining attempt task path.
Path taskAttemptBasePath =
CommitUtils.magicTaskAttemptBasePath(suite.taskAttemptContext(), outputPath);
Path subTaskAttemptPath = new Path(taskAttemptBasePath, "tmp.pending");
CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
assertTrue(fs.exists(taskAttemptBasePath));
assertTrue(fs.exists(subTaskAttemptPath));
// Setup job.
suite.setupJob();
suite.assertHasMagicKeys();
// It will clear all the job path once we've set up the job.
assertFalse(fs.exists(taskAttemptBasePath));
assertFalse(fs.exists(subTaskAttemptPath));
// Left some the task paths.
CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
assertTrue(fs.exists(taskAttemptBasePath));
assertTrue(fs.exists(subTaskAttemptPath));
// Setup task.
suite.setupTask();
assertFalse(fs.exists(subTaskAttemptPath));
}
@Test
public void testCommitTask() throws Exception {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
// Setup job
suite.setupJob();
suite.dumpObjectStorage();
suite.assertHasMagicKeys();
// Setup task
suite.setupTask();
// Write records.
suite.assertNoMagicPendingFile();
suite.assertMultipartUpload(0);
suite.writeOutput();
suite.dumpObjectStorage();
suite.assertHasMagicPendingFile();
suite.assertNoMagicMultipartUpload();
suite.assertMultipartUpload(1);
// Assert the pending file content.
Path pendingPath = suite.magicPendingPath();
byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
Pending pending = Pending.deserialize(pendingData);
assertEquals(suite.destPartKey(), pending.destKey());
assertEquals(20, pending.length());
assertEquals(1, pending.parts().size());
// Commit the task.
suite.commitTask();
// Verify the pending set file.
suite.assertHasPendingSet();
// Assert the pending set file content.
Path pendingSetPath = suite.magicPendingSetPath();
byte[] pendingSetData = CommitUtils.load(suite.fs(), pendingSetPath);
PendingSet pendingSet = PendingSet.deserialize(pendingSetData);
assertEquals(suite.job().getJobID().toString(), pendingSet.jobId());
assertEquals(1, pendingSet.commits().size());
assertEquals(pending, pendingSet.commits().get(0));
assertEquals(pendingSet.extraData(), ImmutableMap.of(CommitUtils.TASK_ATTEMPT_ID,
suite.taskAttemptContext().getTaskAttemptID().toString()));
// Complete the multipart upload and verify the results.
ObjectStorage storage = suite.storage();
storage.completeUpload(pending.destKey(), pending.uploadId(), pending.parts());
suite.verifyPartContent();
}
@Test
public void testAbortTask() throws Exception {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
suite.setupJob();
suite.setupTask();
// Pre-check before the output write.
suite.assertNoMagicPendingFile();
suite.assertMultipartUpload(0);
// Execute the output write.
suite.writeOutput();
// Post-check after the output write.
suite.assertHasMagicPendingFile();
suite.assertNoMagicMultipartUpload();
suite.assertMultipartUpload(1);
// Assert the pending file content.
Path pendingPath = suite.magicPendingPath();
byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
Pending pending = Pending.deserialize(pendingData);
assertEquals(suite.destPartKey(), pending.destKey());
assertEquals(20, pending.length());
assertEquals(1, pending.parts().size());
// Abort the task.
suite.abortTask();
// Verify the state after aborting task.
suite.assertNoMagicPendingFile();
suite.assertNoMagicMultipartUpload();
suite.assertMultipartUpload(0);
suite.assertNoTaskAttemptPath();
}
@Test
public void testCommitJob() throws Exception {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
suite.setupJob();
suite.setupTask();
suite.writeOutput();
suite.commitTask();
// Commit the job.
suite.assertNoPartFiles();
suite.commitJob();
// Verify the output.
suite.assertNoMagicMultipartUpload();
suite.assertNoMagicObjectKeys();
suite.assertSuccessMarker();
suite.assertSummaryReport(reportDir);
suite.verifyPartContent();
}
@Test
public void testCommitJobFailed() throws Exception {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
suite.setupJob();
suite.setupTask();
suite.writeOutput();
suite.commitTask();
// Commit the job.
suite.assertNoPartFiles();
suite.commitJob();
}
@Test
public void testTaskCommitAfterJobCommit() throws Exception {
JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
suite.setupJob();
suite.setupTask();
suite.writeOutput();
suite.commitTask();
// Commit the job
suite.assertNoPartFiles();
suite.commitJob();
// Verify the output.
suite.assertNoMagicMultipartUpload();
suite.assertNoMagicObjectKeys();
suite.assertSuccessMarker();
suite.verifyPartContent();
// Commit the task again.
assertThrows(FileNotFoundException.class, suite::commitTask);
}
@Test
public void testTaskCommitWithConsistentJobId() throws Exception {
Configuration config = newConf();
String consistentJobId = randomFormedJobId();
config.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
JobSuite suite = JobSuite.create(config, taskAttempt0, outputPath);
Assumptions.assumeFalse(suite.skipTests());
// By now, we have two "jobId"s, one is spark uuid, and the other is the jobId in taskAttempt.
// The job committer will adopt the former.
suite.setupJob();
// Next, we clear spark uuid, and set the jobId of taskAttempt to another value. In this case,
// the committer will take the jobId of taskAttempt as the final jobId, which is not consistent
// with the one that committer holds.
config.unset(CommitUtils.SPARK_WRITE_UUID);
JobConf jobConf = new JobConf(config);
String anotherJobId = randomTrimmedJobId();
TaskAttemptID taskAttemptId1 =
JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID);
final TaskAttemptContext attemptContext1 =
JobSuite.createTaskAttemptContext(jobConf, taskAttemptId1, JobSuite.DEFAULT_APP_ATTEMPT_ID);
assertThrows(IllegalArgumentException.class, () -> suite.setupTask(attemptContext1),
"JobId set in the context");
// Even though we use another taskAttempt, as long as we ensure the spark uuid is consistent,
// the jobId in committer is consistent.
config.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
config.set(FileOutputFormat.OUTDIR, outputPath.toString());
jobConf = new JobConf(config);
anotherJobId = randomTrimmedJobId();
TaskAttemptID taskAttemptId2 =
JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID);
TaskAttemptContext attemptContext2 =
JobSuite.createTaskAttemptContext(jobConf, taskAttemptId2, JobSuite.DEFAULT_APP_ATTEMPT_ID);
suite.setupTask(attemptContext2);
// Write output must use the same task context with setup task.
suite.writeOutput(attemptContext2);
// Commit task must use the same task context with setup task.
suite.commitTask(attemptContext2);
suite.assertPendingSetAtRightLocation();
// Commit the job
suite.assertNoPartFiles();
suite.commitJob();
// Verify the output.
suite.assertNoMagicMultipartUpload();
suite.assertNoMagicObjectKeys();
suite.assertSuccessMarker();
suite.verifyPartContent();
}
}