AbstractAbfsClusterITest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.junit.jupiter.api.AfterAll;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.rules.TemporaryFolder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.DurationInfo;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_SERVICE_PRINCIPAL_OBJECT_ID;
import static org.apache.hadoop.io.IOUtils.closeStream;
/**
* Tests which create a yarn minicluster.
* These are all considered scale tests; the probe for
* scale tests being enabled is executed before the cluster
* is set up to avoid wasting time on non-scale runs.
*/
public abstract class AbstractAbfsClusterITest extends
AbstractManifestCommitterTest {
public static final int NO_OF_NODEMANAGERS = 2;
private final ABFSContractTestBinding binding;
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
protected AbstractAbfsClusterITest() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
protected int getTestTimeoutMillis() {
return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
}
@BeforeEach
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
requireScaleTestsEnabled();
if (getClusterBinding() == null) {
clusterBinding = demandCreateClusterBinding();
}
assertNotNull(getClusterBinding(), "cluster is not bound");
}
@AfterAll
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
clusterBinding = null;
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
/**
* This is the cluster binding which every subclass must create.
*/
protected static final class ClusterBinding {
private String clusterName;
private final MiniMRYarnCluster yarn;
public ClusterBinding(
final String clusterName,
final MiniMRYarnCluster yarn) {
this.clusterName = clusterName;
this.yarn = requireNonNull(yarn);
}
/**
* Get the cluster FS, which will either be HDFS or the local FS.
* @return a filesystem.
* @throws IOException failure
*/
public FileSystem getClusterFS() throws IOException {
return FileSystem.getLocal(yarn.getConfig());
}
public MiniMRYarnCluster getYarn() {
return yarn;
}
public Configuration getConf() {
return getYarn().getConfig();
}
public String getClusterName() {
return clusterName;
}
public void terminate() {
closeStream(getYarn());
}
}
/**
* Create the cluster binding.
* The configuration will be patched by propagating down options
* from the maven build (S3Guard binding etc) and turning off unwanted
* YARN features.
*
* If an HDFS cluster is requested,
* the HDFS and YARN clusters will share the same configuration, so
* the HDFS cluster binding is implicitly propagated to YARN.
* If one is not requested, the local filesystem is used as the cluster FS.
* @param conf configuration to start with.
* @return the cluster binding.
* @throws IOException failure.
*/
protected static ClusterBinding createCluster(
final JobConf conf) throws IOException {
try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) {
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
// create a unique cluster name based on the current time in millis.
String timestamp = LocalDateTime.now().format(
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
String clusterName = "yarn-" + timestamp;
MiniMRYarnCluster yarnCluster =
new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS);
yarnCluster.init(conf);
yarnCluster.start();
return new ClusterBinding(clusterName, yarnCluster);
}
}
/**
* Terminate the cluster if it is not null.
* @param cluster the cluster
*/
protected static void terminateCluster(ClusterBinding cluster) {
if (cluster != null) {
cluster.terminate();
}
}
/**
* Get the cluster binding for this subclass.
* @return the cluster binding
*/
protected ClusterBinding getClusterBinding() {
return clusterBinding;
}
protected MiniMRYarnCluster getYarn() {
return getClusterBinding().getYarn();
}
/**
* We stage work into a temporary directory rather than directly under
* the user's home directory, as that is often rejected by CI test
* runners.
*/
@Rule
public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
/**
* binding on demand rather than in a BeforeClass static method.
* Subclasses can override this to change the binding options.
* @return the cluster binding
*/
protected ClusterBinding demandCreateClusterBinding() throws Exception {
return createCluster(new JobConf());
}
/**
* Create a job configuration.
* This creates a new job conf from the yarn
* cluster configuration then calls
* {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
* @return the new job configuration.
* @throws IOException failure
*/
protected JobConf newJobConf() throws IOException {
JobConf jobConf = new JobConf(getYarn().getConfig());
jobConf.addResource(getConfiguration());
if (getConfiguration().getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME,
AuthType.SharedKey) == AuthType.OAuth) {
assumeValidTestConfigPresent(
FS_AZURE_BLOB_FS_CLIENT_SERVICE_PRINCIPAL_OBJECT_ID);
String pid = jobConf.get(FS_AZURE_BLOB_FS_CLIENT_SERVICE_PRINCIPAL_OBJECT_ID);
jobConf.set(FS_AZURE_OVERRIDE_OWNER_SP, pid);
jobConf.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "*");
jobConf.setBoolean(String.format("fs.%s.impl.disable.cache",
FileSystemUriSchemes.ABFS_SECURE_SCHEME), true);
}
applyCustomConfigOptions(jobConf);
return jobConf;
}
/**
* Patch the (job) configuration for this committer.
* @param jobConf configuration to patch
* @return a configuration which will run this configuration.
*/
protected Configuration patchConfigurationForCommitter(
final Configuration jobConf) {
enableManifestCommitter(jobConf);
return jobConf;
}
/**
* Override point to let implementations tune the MR Job conf.
* @param jobConf configuration
*/
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
}
/**
* Assume that scale tests are enabled.
*/
protected void requireScaleTestsEnabled() {
assumeScaleTestsEnabled(getConfiguration());
}
protected void assumeValidTestConfigPresent(final String key) {
String configuredValue = getConfiguration().get(key);
Assume.assumeTrue(configuredValue != null && !configuredValue.isEmpty());
}
}