AbstractYarnClusterITest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
/**
* Full integration test MR jobs.
*
* This is all done on shared static mini YARN and (optionally) HDFS clusters,
* set up before any of the tests methods run.
*
* To isolate tests properly for parallel test runs, that static state
* needs to be stored in the final classes implementing the tests, and
* exposed to the base class, with the setup clusters in the
* specific test suites creating the clusters with unique names.
*
* This is "hard" to do in Java, unlike, say, Scala.
*
* Note: this turns out not to be the root cause of ordering problems
* with the Terasort tests (that is hard coded use of a file in the local FS),
* but this design here does make it clear that the before and after class
* operations are explicitly called in the subclasses.
* If two subclasses of this class are instantiated in the same JVM, in order,
* they are guaranteed to be isolated.
*
*/
public abstract class AbstractYarnClusterITest extends AbstractCommitITest {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractYarnClusterITest.class);
private static final int TEST_FILE_COUNT = 1;
private static final int SCALE_TEST_FILE_COUNT = 10;
public static final int SCALE_TEST_KEYS = 100;
public static final int BASE_TEST_KEYS = 10;
public static final int NO_OF_NODEMANAGERS = 2;
private boolean scaleTest;
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@AfterAll
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
clusterBinding = null;
}
/**
* This is the cluster binding which every subclass must create.
*/
protected static final class ClusterBinding {
private String clusterName;
private final MiniDFSClusterService hdfs;
private final MiniMRYarnCluster yarn;
public ClusterBinding(
final String clusterName,
final MiniDFSClusterService hdfs,
final MiniMRYarnCluster yarn) {
this.clusterName = clusterName;
this.hdfs = hdfs;
this.yarn = checkNotNull(yarn);
}
public MiniDFSClusterService getHdfs() {
return hdfs;
}
/**
* Get the cluster FS, which will either be HDFS or the local FS.
* @return a filesystem.
* @throws IOException failure
*/
public FileSystem getClusterFS() throws IOException {
MiniDFSClusterService miniCluster = getHdfs();
return miniCluster != null
? miniCluster.getClusterFS()
: FileSystem.getLocal(yarn.getConfig());
}
public MiniMRYarnCluster getYarn() {
return yarn;
}
public Configuration getConf() {
return getYarn().getConfig();
}
public String getClusterName() {
return clusterName;
}
public void terminate() {
terminateService(getYarn());
terminateService(getHdfs());
}
}
/**
* Create the cluster binding.
* The configuration will be patched by propagating down options
* from the maven build and turning off unwanted
* YARN features.
*
* If an HDFS cluster is requested,
* the HDFS and YARN clusters will share the same configuration, so
* the HDFS cluster binding is implicitly propagated to YARN.
* If one is not requested, the local filesystem is used as the cluster FS.
* @param conf configuration to start with.
* @param useHDFS should an HDFS cluster be instantiated.
* @return the cluster binding.
* @throws IOException failure.
*/
protected static ClusterBinding createCluster(
final JobConf conf,
final boolean useHDFS) throws IOException {
prepareTestConfiguration(conf);
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
// minicluster tests overreact to not enough disk space.
conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
// create a unique cluster name based on the current time in millis.
String timestamp = LocalDateTime.now().format(
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
String clusterName = "yarn-" + timestamp;
MiniDFSClusterService miniDFSClusterService =
useHDFS
? deployService(conf, new MiniDFSClusterService())
: null;
MiniMRYarnCluster yarnCluster = deployService(conf,
new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS));
return new ClusterBinding(clusterName, miniDFSClusterService, yarnCluster);
}
/**
* Terminate the cluster if it is not null.
* @param cluster the cluster
*/
protected static void terminateCluster(ClusterBinding cluster) {
if (cluster != null) {
cluster.terminate();
}
}
/**
* Get the cluster binding for this subclass.
* @return the cluster binding
*/
protected ClusterBinding getClusterBinding() {
return clusterBinding;
}
protected MiniMRYarnCluster getYarn() {
return getClusterBinding().getYarn();
}
/**
* Get the cluster filesystem -hdfs or local.
* @return the filesystem shared across the yarn nodes.
*/
protected FileSystem getClusterFS() throws IOException {
return getClusterBinding().getClusterFS();
}
/**
* We stage work into a temporary directory rather than directly under
* the user's home directory, as that is often rejected by CI test
* runners.
*/
public File stagingFilesDir;
/**
* The name of the committer as returned by
* {@link AbstractS3ACommitter#getName()}
* and used for committer construction.
*/
protected abstract String committerName();
/**
* binding on demand rather than in a BeforeClass static method.
* Subclasses can override this to change the binding options.
* @return the cluster binding
*/
protected ClusterBinding demandCreateClusterBinding() throws Exception {
return createCluster(new JobConf(), false);
}
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
scaleTest = getTestPropertyBool(
getConfiguration(),
KEY_SCALE_TESTS_ENABLED,
DEFAULT_SCALE_TESTS_ENABLED);
if (getClusterBinding() == null) {
clusterBinding = demandCreateClusterBinding();
}
assertNotNull(
getClusterBinding(), "cluster is not bound");
String methodName = getMethodName();
stagingFilesDir = File.createTempFile(methodName, "");
}
@Override
protected int getTestTimeoutMillis() {
return SCALE_TEST_TIMEOUT_SECONDS * 1000;
}
/**
* Create a job configuration.
* This creates a new job conf from the yarn
* cluster configuration then calls
* {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
* @return the new job configuration.
* @throws IOException failure
*/
protected JobConf newJobConf() throws IOException {
JobConf jobConf = new JobConf(getYarn().getConfig());
jobConf.addResource(getConfiguration());
applyCustomConfigOptions(jobConf);
return jobConf;
}
protected Job createJob(Configuration jobConf) throws IOException {
Job mrJob = Job.getInstance(jobConf, getMethodName());
patchConfigurationForCommitter(mrJob.getConfiguration());
return mrJob;
}
/**
* Patch the (job) configuration for this committer.
* @param jobConf configuration to patch
* @return a configuration which will run this configuration.
*/
protected Configuration patchConfigurationForCommitter(
final Configuration jobConf) {
jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
isUniqueFilenames());
bindCommitter(jobConf,
CommitConstants.S3A_COMMITTER_FACTORY,
committerName());
// pass down the scale test flag
jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, isScaleTest());
// and fix the commit dir to the local FS across all workers.
String staging = stagingFilesDir.getAbsolutePath();
LOG.info("Staging temp dir is {}", staging);
jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, staging);
return jobConf;
}
/**
* Get the file count for the test.
* @return the number of mappers to create.
*/
public int getTestFileCount() {
return isScaleTest() ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
}
/**
* Override point to let implementations tune the MR Job conf.
* @param jobConf configuration
*/
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
}
/**
* Override point for any committer specific validation operations;
* called after the base assertions have all passed.
* @param destPath destination of work
* @param successData loaded success data
* @param jobId job id
* @throws Exception failure
*/
protected void customPostExecutionValidation(Path destPath,
SuccessData successData, String jobId)
throws Exception {
}
/**
* Assume that scale tests are enabled.
*/
protected void requireScaleTestsEnabled() {
assume("Scale test disabled: to enable set property " +
KEY_SCALE_TESTS_ENABLED,
isScaleTest());
}
public boolean isScaleTest() {
return scaleTest;
}
public boolean isUniqueFilenames() {
return false;
}
}