TestWorkloadGenerator.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.dynamometer.workloadgenerator;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditCommandParser;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ImpersonationProvider;
import org.jline.utils.Log;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link WorkloadDriver} and related classes. */
public class TestWorkloadGenerator {
private static final Logger LOG =
LoggerFactory.getLogger(TestWorkloadGenerator.class);
private Configuration conf;
private MiniDFSCluster miniCluster;
private FileSystem dfs;
@BeforeEach
public void setup() throws Exception {
conf = new Configuration();
conf.setClass(HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS,
AllowUserImpersonationProvider.class, ImpersonationProvider.class);
miniCluster = new MiniDFSCluster.Builder(conf).build();
miniCluster.waitClusterUp();
dfs = miniCluster.getFileSystem();
dfs.mkdirs(new Path("/tmp"),
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
dfs.setOwner(new Path("/tmp"), "hdfs", "hdfs");
}
@AfterEach
public void tearDown() throws Exception {
if (miniCluster != null) {
miniCluster.shutdown();
miniCluster = null;
}
}
@Test
public void testAuditWorkloadDirectParserWithOutput() throws Exception {
String workloadInputPath = TestWorkloadGenerator.class.getClassLoader()
.getResource("audit_trace_direct").toString();
String auditOutputPath = "/tmp/trace_output_direct";
conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath);
conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60 * 1000);
testAuditWorkloadWithOutput(auditOutputPath);
}
@Test
public void testAuditWorkloadHiveParserWithOutput() throws Exception {
String workloadInputPath =
TestWorkloadGenerator.class.getClassLoader()
.getResource("audit_trace_hive").toString();
String auditOutputPath = "/tmp/trace_output_hive";
conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath);
conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY,
AuditLogHiveTableParser.class, AuditCommandParser.class);
testAuditWorkloadWithOutput(auditOutputPath);
}
/**
* {@link ImpersonationProvider} that confirms the user doing the
* impersonating is the same as the user running the MiniCluster.
*/
private static class AllowUserImpersonationProvider extends Configured
implements ImpersonationProvider {
public void init(String configurationPrefix) {
// Do nothing
}
public void authorize(UserGroupInformation user, InetAddress remoteAddress)
throws AuthorizationException {
try {
if (!user.getRealUser().getShortUserName()
.equals(UserGroupInformation.getCurrentUser().getShortUserName())) {
throw new AuthorizationException();
}
} catch (IOException ioe) {
throw new AuthorizationException(ioe);
}
}
}
private void testAuditWorkloadWithOutput(String auditOutputPath)
throws Exception {
long workloadStartTime = System.currentTimeMillis() + 10000;
Job workloadJob = WorkloadDriver.getJobForSubmission(conf,
dfs.getUri().toString(), workloadStartTime, AuditReplayMapper.class);
boolean success = workloadJob.waitForCompletion(true);
assertTrue(success, "workload job should succeed");
Counters counters = workloadJob.getCounters();
assertEquals(6,
counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS)
.getValue());
assertEquals(1,
counters
.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS)
.getValue());
assertTrue(dfs.getFileStatus(new Path("/tmp/test1")).isFile());
assertTrue(
dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory());
assertFalse(dfs.exists(new Path("/denied")));
assertTrue(dfs.exists(new Path(auditOutputPath)));
try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath,
"part-r-00000"))) {
String auditOutput = IOUtils.toString(auditOutputFile,
StandardCharsets.UTF_8);
Log.info(auditOutput);
assertTrue(auditOutput.matches(
".*(hdfs,WRITE,[A-Z]+,[13]+,[0-9]+\\n){3}.*"));
// Matches three lines of the format "hdfs,WRITE,name,count,time"
// Using [13] for the count group because each operation is run either
// 1 or 3 times but the output order isn't guaranteed
}
}
}