MRJobTestBase.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.terasort.TeraGen;
import org.apache.hadoop.examples.terasort.TeraSort;
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.TestEnv;
import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public abstract class MRJobTestBase {
private static final Logger LOG = LoggerFactory.getLogger(MRJobTestBase.class);
private static Configuration conf = new Configuration();
private static MiniMRYarnCluster yarnCluster;
private static FileSystem fs;
private static Path testDataPath;
public static void setConf(Configuration newConf) {
conf = newConf;
}
@BeforeAll
public static void beforeClass() throws IOException {
assumeTrue(TestEnv.checkTestEnabled());
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
conf.set("mapreduce.outputcommitter.factory.scheme.tos",
CommitterFactory.class.getName()); // 3x newApiCommitter=true.
conf.set("mapred.output.committer.class",
Committer.class.getName()); // 2x and 3x newApiCommitter=false.
conf.set("mapreduce.outputcommitter.class",
org.apache.hadoop.fs.tosfs.commit.Committer.class.getName()); // 2x newApiCommitter=true.
// Start the yarn cluster.
yarnCluster = new MiniMRYarnCluster("yarn-" + System.currentTimeMillis(), 2);
LOG.info("Default filesystem: {}", conf.get("fs.defaultFS"));
LOG.info("Default filesystem implementation: {}", conf.get("fs.AbstractFileSystem.tos.impl"));
yarnCluster.init(conf);
yarnCluster.start();
fs = FileSystem.get(conf);
testDataPath = new Path("/mr-test-" + UUIDUtils.random())
.makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
@AfterAll
public static void afterClass() throws IOException {
if (!TestEnv.checkTestEnabled()) {
return;
}
fs.delete(testDataPath, true);
if (yarnCluster != null) {
yarnCluster.stop();
}
}
@AfterEach
public void after() throws IOException {
}
@Test
public void testTeraGen() throws Exception {
Path teraGenPath =
new Path(testDataPath, "teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path output = new Path(teraGenPath, "output");
JobConf jobConf = new JobConf(yarnCluster.getConfig());
jobConf.addResource(conf);
jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
String[] args = new String[]{Integer.toString(1000), output.toString()};
int result = ToolRunner.run(jobConf, new TeraGen(), args);
assertEquals(0, result, String.format("teragen %s", StringUtils.join(" ", args)));
// Verify the success data.
ObjectStorage storage = ObjectStorageFactory.create(
output.toUri().getScheme(), output.toUri().getAuthority(), conf);
int byteSizes = 0;
Path success = new Path(output, CommitUtils.SUCCESS);
byte[] serializedData = CommitUtils.load(fs, success);
SuccessData successData = SuccessData.deserialize(serializedData);
assertTrue(successData.success(), "Should execute successfully");
// Assert the destination paths.
assertEquals(2, successData.filenames().size());
successData.filenames().sort(String::compareTo);
assertEquals(ObjectUtils.pathToKey(new Path(output, "part-m-00000")),
successData.filenames().get(0));
assertEquals(ObjectUtils.pathToKey(new Path(output, "part-m-00001")),
successData.filenames().get(1));
for (String partFileKey : successData.filenames()) {
ObjectInfo objectInfo = storage.head(partFileKey);
assertNotNull(objectInfo, "Output file should be existing");
byteSizes += objectInfo.size();
}
assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* total 1000 rows */);
}
@Test
public void testTeraSort() throws Exception {
Path teraGenPath =
new Path(testDataPath, "teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path inputPath = new Path(teraGenPath, "output");
Path outputPath = new Path(teraGenPath, "sortOutput");
JobConf jobConf = new JobConf(yarnCluster.getConfig());
jobConf.addResource(conf);
jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
String[] args = new String[]{inputPath.toString(), outputPath.toString()};
int result = ToolRunner.run(jobConf, new TeraSort(), args);
assertEquals(0, result, String.format("terasort %s", StringUtils.join(" ", args)));
// Verify the success data.
ObjectStorage storage = ObjectStorageFactory
.create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), conf);
int byteSizes = 0;
Path success = new Path(outputPath, CommitUtils.SUCCESS);
byte[] serializedData = CommitUtils.load(fs, success);
SuccessData successData = SuccessData.deserialize(serializedData);
assertTrue(successData.success(), "Should execute successfully");
// Assert the destination paths.
assertEquals(1, successData.filenames().size());
successData.filenames().sort(String::compareTo);
assertEquals(ObjectUtils.pathToKey(new Path(outputPath, "part-r-00000")),
successData.filenames().get(0));
for (String partFileKey : successData.filenames()) {
ObjectInfo objectInfo = storage.head(partFileKey);
assertNotNull(objectInfo, "Output file should be existing");
byteSizes += objectInfo.size();
}
assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* total 1000 rows */);
}
@Disabled
@Test
public void testWordCount() throws Exception {
Path wordCountPath =
new Path(testDataPath, "wc").makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path output = new Path(wordCountPath, "output");
Path input = new Path(wordCountPath, "input");
JobConf jobConf = new JobConf(yarnCluster.getConfig());
jobConf.addResource(conf);
if (!fs.mkdirs(input)) {
throw new IOException("Mkdirs failed to create " + input.toString());
}
DataOutputStream file = fs.create(new Path(input, "part-0"));
file.writeBytes("a a b c");
file.close();
String[] args = new String[]{input.toString(), output.toString()};
int result = ToolRunner.run(jobConf, new WordCount(), args);
assertEquals(0, result, String.format("WordCount %s", StringUtils.join(" ", args)));
// Verify the success path.
assertTrue(fs.exists(new Path(output, CommitUtils.SUCCESS)));
assertTrue(fs.exists(new Path(output, "part-00000")));
Path success = new Path(output, CommitUtils.SUCCESS);
assertTrue(CommitUtils.load(fs, success).length != 0, "Success file must be not empty");
byte[] serializedData = CommitUtils.load(fs, new Path(output, "part-00000"));
String outputAsStr = new String(serializedData);
Map<String, Integer> resAsMap = getResultAsMap(outputAsStr);
assertEquals(2, (int) resAsMap.get("a"));
assertEquals(1, (int) resAsMap.get("b"));
assertEquals(1, (int) resAsMap.get("c"));
}
private Map<String, Integer> getResultAsMap(String outputAsStr) {
Map<String, Integer> result = new HashMap<>();
for (String line : outputAsStr.split("\n")) {
String[] tokens = line.split("\t");
assertTrue(tokens.length > 1,
String.format("Not enough tokens in in string %s from output %s", line, outputAsStr));
result.put(tokens[0], Integer.parseInt(tokens[1]));
}
return result;
}
}