ITestMagicCommitProtocol.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.assertj.core.api.Assertions;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/**
* Test the magic committer's commit protocol.
*/
public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
private boolean trackCommitsInMemory;
@Override
protected String suitename() {
return "ITestMagicCommitProtocol";
}
@Override
protected String getCommitterFactoryName() {
return CommitConstants.S3A_COMMITTER_FACTORY;
}
@Override
protected String getCommitterName() {
return CommitConstants.COMMITTER_NAME_MAGIC;
}
@Override
public void setup() throws Exception {
super.setup();
CommitUtils.verifyIsMagicCommitFS(getFileSystem());
}
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{false},
{true}
});
}
public void initITestMagicCommitProtocol(boolean pTrackCommitsInMemory)
throws Exception {
this.trackCommitsInMemory = pTrackCommitsInMemory;
setup();
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED);
conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, trackCommitsInMemory);
return conf;
}
@Override
public void assertJobAbortCleanedUp(JobData jobData)
throws Exception {
// special handling of magic directory; harmless in staging
Path magicDir = getMagicJobPath(jobData.getCommitter().getUUID(), getOutDir());
ContractTestUtils.assertPathDoesNotExist(getFileSystem(),
"magic dir ", magicDir);
super.assertJobAbortCleanedUp(jobData);
}
@Override
protected MagicS3GuardCommitter createCommitter(
Path outputPath,
TaskAttemptContext context)
throws IOException {
return new MagicS3GuardCommitter(outputPath, context);
}
public MagicS3GuardCommitter createFailingCommitter(
TaskAttemptContext tContext) throws IOException {
return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
}
protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength,
String jobId) throws IOException {
String pathStr = p.toString();
Assertions.assertThat(pathStr)
.describedAs("Magic path")
.contains("/" + MAGIC_PATH_PREFIX + jobId + "/");
assertPathDoesNotExist("task attempt visible", p);
}
protected void validateTaskAttemptPathAfterWrite(Path marker,
final long expectedLength) throws IOException {
// the pending file exists
Path pendingFile = new Path(marker.toString() + PENDING_SUFFIX);
assertPathExists("pending file", pendingFile);
S3AFileSystem fs = getFileSystem();
// THIS SEQUENCE MUST BE RUN IN ORDER ON A S3GUARDED
// STORE
// if you list the parent dir and find the marker, it
// is really 0 bytes long
String name = marker.getName();
List<LocatedFileStatus> filtered = toList(listAndFilter(fs,
marker.getParent(), false,
(path) -> path.getName().equals(name)));
Assertions.assertThat(filtered)
.hasSize(1);
Assertions.assertThat(filtered.get(0))
.matches(lst -> lst.getLen() == 0,
"Listing should return 0 byte length");
// marker file is empty
getTestHelper().assertIsMarkerFile(marker, expectedLength);
}
/**
* The magic committer paths are always on S3, and always have
* "MAGIC PATH" in the path.
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
@Override
protected void validateTaskAttemptWorkingDirectory(
final AbstractS3ACommitter committer,
final TaskAttemptContext context) throws IOException {
URI wd = committer.getWorkPath().toUri();
assertEquals("s3a", wd.getScheme(),
"Wrong schema for working dir " + wd
+ " with committer " + committer);
Assertions.assertThat(wd.getPath())
.contains("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/");
}
/**
* Verify that the "MAGIC PATH" for the application/tasks use the
* committer UUID to ensure uniqueness in the case of more than
* one job writing to the same destination path.
*/
@MethodSource("params")
@ParameterizedTest(name = "track-commit-in-memory-{0}")
public void testCommittersPathsHaveUUID(boolean pTrackCommitsInMemory) throws Throwable {
initITestMagicCommitProtocol(pTrackCommitsInMemory);
TaskAttemptContext tContext = new TaskAttemptContextImpl(
getConfiguration(),
getTaskAttempt0());
MagicS3GuardCommitter committer = createCommitter(getOutDir(), tContext);
String ta0 = getTaskAttempt0().toString();
// magic path for the task attempt
Path taskAttemptPath = committer.getTaskAttemptPath(tContext);
Assertions.assertThat(taskAttemptPath.toString())
.describedAs("task path of %s", committer)
.contains(committer.getUUID())
.contains("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/")
.doesNotContain(TEMP_DATA)
.endsWith(BASE)
.contains(ta0);
// temp path for files which the TA will create with an absolute path
// and which need renaming into place.
Path tempTaskAttemptPath = committer.getTempTaskAttemptPath(tContext);
Assertions.assertThat(tempTaskAttemptPath.toString())
.describedAs("Temp task path of %s", committer)
.contains(committer.getUUID())
.contains(TEMP_DATA)
.doesNotContain("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/")
.doesNotContain(BASE)
.contains(ta0);
}
/**
* Verify that the magic committer cleanup
*/
@Test
public void testCommitterCleanup() throws Throwable {
describe("Committer cleanup enabled. hence it should delete the task attempt path after commit");
JobData jobData = startJob(true);
JobContext jContext = jobData.getJContext();
TaskAttemptContext tContext = jobData.getTContext();
AbstractS3ACommitter committer = jobData.getCommitter();
commit(committer, jContext, tContext);
assertJobAttemptPathDoesNotExist(committer, jContext);
describe("Committer cleanup is disabled. hence it should not delete the task attempt path after commit");
JobData jobData2 = startJob(true);
JobContext jContext2 = jobData2.getJContext();
TaskAttemptContext tContext2 = jobData2.getTContext();
AbstractS3ACommitter committer2 = jobData2.getCommitter();
committer2.getConf().setBoolean(FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED, false);
commit(committer2, jContext2, tContext2);
assertJobAttemptPathExists(committer2, jContext2);
}
/**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.
*/
private static final class CommitterWithFailedThenSucceed extends
MagicS3GuardCommitter implements CommitterFaultInjection {
private final CommitterFaultInjectionImpl injection;
CommitterWithFailedThenSucceed(Path outputPath,
TaskAttemptContext context) throws IOException {
super(outputPath, context);
injection = new CommitterFaultInjectionImpl(outputPath, context, true);
}
@Override
public void setupJob(JobContext context) throws IOException {
injection.setupJob(context);
super.setupJob(context);
}
@Override
public void abortJob(JobContext context, JobStatus.State state)
throws IOException {
injection.abortJob(context, state);
super.abortJob(context, state);
}
@Override
@SuppressWarnings("deprecation")
public void cleanupJob(JobContext context) throws IOException {
injection.cleanupJob(context);
super.cleanupJob(context);
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
injection.setupTask(context);
super.setupTask(context);
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
injection.commitTask(context);
super.commitTask(context);
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
injection.abortTask(context);
super.abortTask(context);
}
@Override
public void commitJob(JobContext context) throws IOException {
injection.commitJob(context);
super.commitJob(context);
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
injection.needsTaskCommit(context);
return super.needsTaskCommit(context);
}
@Override
public void setFaults(CommitterFaultInjection.Faults... faults) {
injection.setFaults(faults);
}
}
}