ITestS3ACommitterFactory.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Tests for the committer factory creation/override process.
*/
public final class ITestS3ACommitterFactory extends AbstractCommitITest {
private static final Logger LOG = LoggerFactory.getLogger(
ITestS3ACommitterFactory.class);
/**
* Name for invalid committer: {@value}.
*/
private static final String INVALID_NAME = "invalid-name";
/**
* Counter to guarantee that even in parallel test runs, no job has the same
* ID.
*/
private String jobId;
// A random task attempt id for testing.
private String attempt0;
private TaskAttemptID taskAttempt0;
private Path outDir;
private S3ACommitterFactory factory;
private TaskAttemptContext tContext;
/**
* Parameterized list of bindings of committer name in config file to
* expected class instantiated.
*/
private static final Object[][] BINDINGS = {
{"", "", FileOutputCommitter.class, "Default Binding"},
{COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in FS"},
{COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
"partitoned committer in FS"},
{COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer in FS"},
{COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer in FS"},
{COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir committer in FS"},
{INVALID_NAME, "", null, "invalid committer in FS"},
{"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in task"},
{"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
"partioned committer in task"},
{"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer in task"},
{"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer in task"},
{"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir committer in task"},
{"", INVALID_NAME, null, "invalid committer in task"},
};
/**
* Test array for parameterized test runs.
*
* @return the committer binding for this run.
*/
public static Collection<Object[]> params() {
return Arrays.asList(BINDINGS);
}
/**
* Name of committer to set in filesystem config. If "" do not set one.
*/
private String fsCommitterName;
/**
* Name of committer to set in job config.
*/
private String jobCommitterName;
/**
* Expected committer class.
* If null: an exception is expected
*/
private Class<? extends AbstractS3ACommitter> committerClass;
/**
* Description from parameters, simply for thread names to be more informative.
*/
private String description;
/**
* Create a parameterized instance.
* @param pFsCommitterName committer to set in filesystem config
* @param pJobCommitterName committer to set in job config
* @param pCommitterClass expected committer class
* @param pDescription debug text for thread names.
*/
public void initITestS3ACommitterFactory(
final String pFsCommitterName,
final String pJobCommitterName,
final Class<? extends AbstractS3ACommitter> pCommitterClass,
final String pDescription) throws Exception {
this.fsCommitterName = pFsCommitterName;
this.jobCommitterName = pJobCommitterName;
this.committerClass = pCommitterClass;
this.description = pDescription;
setup();
}
@Override
protected Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
// do not cache, because we want the committer one to pick up
// the fs with fs-specific configuration
conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
maybeSetCommitterName(conf, fsCommitterName);
return conf;
}
/**
* Set a committer name in a configuration.
* @param conf configuration to patch.
* @param name name. If "" the option is unset.
*/
private static void maybeSetCommitterName(final Configuration conf, final String name) {
if (!name.isEmpty()) {
conf.set(FS_S3A_COMMITTER_NAME, name);
} else {
conf.unset(FS_S3A_COMMITTER_NAME);
}
}
@Override
public void setup() throws Exception {
// destroy all filesystems from previous runs.
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
super.setup();
jobId = randomJobId();
attempt0 = "attempt_" + jobId + "_m_000000_0";
taskAttempt0 = TaskAttemptID.forName(attempt0);
outDir = methodPath();
factory = new S3ACommitterFactory();
final Configuration fsConf = getConfiguration();
JobConf jobConf = new JobConf(fsConf);
jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
maybeSetCommitterName(jobConf, jobCommitterName);
tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
LOG.info("{}: Filesystem Committer='{}'; task='{}'",
description,
fsConf.get(FS_S3A_COMMITTER_NAME),
jobConf.get(FS_S3A_COMMITTER_NAME));
}
@Override
protected void deleteTestDirInTeardown() {
// no-op
}
/**
* Verify that if all config options are unset, the FileOutputCommitter
* is returned.
*/
@MethodSource("params")
@ParameterizedTest(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
public void testBinding(String pFsCommitterName,
String pJobCommitterName,
Class<? extends AbstractS3ACommitter> pCommitterClass,
String pDescription) throws Throwable {
initITestS3ACommitterFactory(pFsCommitterName, pJobCommitterName, pCommitterClass,
pDescription);
assertFactoryCreatesExpectedCommitter(committerClass);
}
/**
* Assert that the factory creates the expected committer.
* If a null committer is passed in, a {@link PathIOException}
* is expected.
* @param expected expected committer class.
* @throws Exception IO failure.
*/
private void assertFactoryCreatesExpectedCommitter(
final Class expected)
throws Exception {
describe("Creating committer: expected class \"%s\"", expected);
if (expected != null) {
assertEquals(expected, createCommitter().getClass(),
"Wrong Committer from factory");
} else {
intercept(PathCommitException.class, this::createCommitter);
}
}
/**
* Create a committer.
* @return the committer
* @throws IOException IO failure.
*/
private PathOutputCommitter createCommitter() throws IOException {
return factory.createOutputCommitter(outDir, tContext);
}
}