HdfsCompatShellScope.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.compat.common;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.*;
public class HdfsCompatShellScope {
private static final Logger LOG =
LoggerFactory.getLogger(HdfsCompatShellScope.class);
private static final Random RANDOM = new Random();
private final HdfsCompatEnvironment env;
private final HdfsCompatSuite suite;
private File stdoutDir = null;
private File passList = null;
private File failList = null;
private File skipList = null;
private Path snapshotPath = null;
private String storagePolicy = null;
private Method disallowSnapshot = null;
public HdfsCompatShellScope(HdfsCompatEnvironment env, HdfsCompatSuite suite) {
this.env = env;
this.suite = suite;
}
public HdfsCompatReport apply() throws Exception {
File localTmpDir = null;
try {
localTmpDir = new File(this.env.getLocalTmpDir());
LOG.info("Local tmp dir: " + localTmpDir.getAbsolutePath());
return runShell(localTmpDir);
} finally {
try {
if (this.disallowSnapshot != null) {
try {
this.disallowSnapshot.invoke(this.env.getFileSystem(),
this.snapshotPath);
} catch (InvocationTargetException e) {
LOG.error("Cannot disallow snapshot", e.getCause());
} catch (ReflectiveOperationException e) {
LOG.error("Disallow snapshot method is invalid", e);
}
}
} finally {
FileUtils.deleteQuietly(localTmpDir);
}
}
}
private HdfsCompatReport runShell(File localTmpDir) throws Exception {
File localDir = new File(localTmpDir, "test");
File scriptDir = new File(localTmpDir, "scripts");
File confDir = new File(localTmpDir, "hadoop-conf");
copyScriptsResource(scriptDir);
try {
setShellLogConf(confDir);
} catch (Exception e) {
LOG.error("Cannot set new conf dir", e);
confDir = null;
}
prepareSnapshot();
this.storagePolicy = getStoragePolicy();
String[] confEnv = getEnv(localDir, scriptDir, confDir);
ExecResult result = exec(confEnv, scriptDir);
printLog(result);
return export();
}
private void copyScriptsResource(File scriptDir) throws IOException {
Files.createDirectories(new File(scriptDir, "cases").toPath());
copyResource("/misc.sh", new File(scriptDir, "misc.sh"));
String[] cases = suite.getShellCases();
for (String res : cases) {
copyResource("/cases/" + res, new File(scriptDir, "cases/" + res));
}
}
private void setShellLogConf(File confDir) throws IOException {
final String hadoopHome = System.getenv("HADOOP_HOME");
final String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if ((hadoopHome == null) || hadoopHome.isEmpty()) {
LOG.error("HADOOP_HOME not configured");
}
if ((hadoopConfDir == null) || hadoopConfDir.isEmpty()) {
throw new IOException("HADOOP_CONF_DIR not configured");
}
File srcDir = new File(hadoopConfDir).getAbsoluteFile();
if (!srcDir.isDirectory()) {
throw new IOException("HADOOP_CONF_DIR is not valid: " + srcDir);
}
Files.createDirectories(confDir.toPath());
FileUtils.copyDirectory(srcDir, confDir);
File logConfFile = new File(confDir, "log4j.properties");
copyResource("/hadoop-compat-bench-log4j.properties", logConfFile, true);
}
@VisibleForTesting
protected void copyResource(String res, File dst) throws IOException {
copyResource(res, dst, false);
}
private void copyResource(String res, File dst, boolean overwrite)
throws IOException {
InputStream in = null;
try {
in = this.getClass().getResourceAsStream(res);
if (in == null) {
in = this.suite.getClass().getResourceAsStream(res);
}
if (in == null) {
throw new IOException("Resource not found" +
" during scripts prepare: " + res);
}
if (dst.exists() && !overwrite) {
throw new IOException("Cannot overwrite existing resource file");
}
Files.createDirectories(dst.getParentFile().toPath());
byte[] buf = new byte[1024];
try (OutputStream out = new FileOutputStream(dst)) {
int nRead = in.read(buf);
while (nRead != -1) {
out.write(buf, 0, nRead);
nRead = in.read(buf);
}
}
} finally {
if (in != null) {
in.close();
}
}
}
private void prepareSnapshot() {
this.snapshotPath = AbstractHdfsCompatCase.getUniquePath(this.env.getBase());
Method allowSnapshot = null;
try {
FileSystem fs = this.env.getFileSystem();
fs.mkdirs(snapshotPath);
Method allowSnapshotMethod = fs.getClass()
.getMethod("allowSnapshot", Path.class);
allowSnapshotMethod.setAccessible(true);
allowSnapshotMethod.invoke(fs, snapshotPath);
allowSnapshot = allowSnapshotMethod;
Method disallowSnapshotMethod = fs.getClass()
.getMethod("disallowSnapshot", Path.class);
disallowSnapshotMethod.setAccessible(true);
this.disallowSnapshot = disallowSnapshotMethod;
} catch (IOException e) {
LOG.error("Cannot prepare snapshot path", e);
} catch (InvocationTargetException e) {
LOG.error("Cannot allow snapshot", e.getCause());
} catch (ReflectiveOperationException e) {
LOG.warn("Get admin snapshot methods failed.");
} catch (Exception e) {
LOG.warn("Prepare snapshot failed", e);
}
if (allowSnapshot == null) {
LOG.warn("No allowSnapshot method found.");
}
if (this.disallowSnapshot == null) {
LOG.warn("No disallowSnapshot method found.");
}
}
private String getStoragePolicy() {
BlockStoragePolicySpi def;
String[] policies;
try {
FileSystem fs = this.env.getFileSystem();
Path base = this.env.getBase();
fs.mkdirs(base);
def = fs.getStoragePolicy(base);
policies = env.getStoragePolicyNames();
} catch (Exception e) {
LOG.warn("Cannot get storage policy", e);
return "Hot";
}
List<String> differentPolicies = new ArrayList<>();
for (String policyName : policies) {
if ((def == null) || !policyName.equalsIgnoreCase(def.getName())) {
differentPolicies.add(policyName);
}
}
if (differentPolicies.isEmpty()) {
final String defPolicyName;
if ((def == null) || (def.getName() == null)) {
defPolicyName = "Hot";
LOG.warn("No valid storage policy name found, use Hot.");
} else {
defPolicyName = def.getName();
LOG.warn("There is only one storage policy: " + defPolicyName);
}
return defPolicyName;
} else {
return differentPolicies.get(
RANDOM.nextInt(differentPolicies.size()));
}
}
@VisibleForTesting
protected String[] getEnv(File localDir, File scriptDir, File confDir)
throws IOException {
List<String> confEnv = new ArrayList<>();
final Map<String, String> environments = System.getenv();
for (Map.Entry<String, String> entry : environments.entrySet()) {
confEnv.add(entry.getKey() + "=" + entry.getValue());
}
if (confDir != null) {
confEnv.add("HADOOP_CONF_DIR=" + confDir.getAbsolutePath());
}
String timestamp = String.valueOf(System.currentTimeMillis());
Path baseUri = new Path(this.env.getBase(), timestamp);
File localUri = new File(localDir, timestamp).getAbsoluteFile();
File resultDir = new File(localDir, timestamp);
Files.createDirectories(resultDir.toPath());
this.stdoutDir = new File(resultDir, "output").getAbsoluteFile();
this.passList = new File(resultDir, "passed").getAbsoluteFile();
this.failList = new File(resultDir, "failed").getAbsoluteFile();
this.skipList = new File(resultDir, "skipped").getAbsoluteFile();
Files.createFile(this.passList.toPath());
Files.createFile(this.failList.toPath());
Files.createFile(this.skipList.toPath());
final String prefix = "HADOOP_COMPAT_";
confEnv.add(prefix + "BASE_URI=" + baseUri);
confEnv.add(prefix + "LOCAL_URI=" + localUri.getAbsolutePath());
confEnv.add(prefix + "SNAPSHOT_URI=" + snapshotPath.toString());
confEnv.add(prefix + "STORAGE_POLICY=" + storagePolicy);
confEnv.add(prefix + "STDOUT_DIR=" + stdoutDir.getAbsolutePath());
confEnv.add(prefix + "PASS_FILE=" + passList.getAbsolutePath());
confEnv.add(prefix + "FAIL_FILE=" + failList.getAbsolutePath());
confEnv.add(prefix + "SKIP_FILE=" + skipList.getAbsolutePath());
return confEnv.toArray(new String[0]);
}
private ExecResult exec(String[] confEnv, File scriptDir)
throws IOException, InterruptedException {
Process process = Runtime.getRuntime().exec(
"prove -r cases", confEnv, scriptDir);
StreamPrinter out = new StreamPrinter(process.getInputStream());
StreamPrinter err = new StreamPrinter(process.getErrorStream());
out.start();
err.start();
int code = process.waitFor();
out.join();
err.join();
return new ExecResult(code, out.lines, err.lines);
}
private void printLog(ExecResult execResult) {
LOG.info("Shell prove\ncode: {}\nstdout:\n\t{}\nstderr:\n\t{}",
execResult.code, String.join("\n\t", execResult.out),
String.join("\n\t", execResult.err));
File casesRoot = new File(stdoutDir, "cases").getAbsoluteFile();
String[] casesDirList = casesRoot.list();
if (casesDirList == null) {
LOG.error("stdout/stderr root directory is invalid: " + casesRoot);
return;
}
Arrays.sort(casesDirList, (o1, o2) -> {
if (o1.length() == o2.length()) {
return o1.compareTo(o2);
} else {
return o1.length() - o2.length();
}
});
for (String casesDir : casesDirList) {
printCasesLog(new File(casesRoot, casesDir).getAbsoluteFile());
}
}
private void printCasesLog(File casesDir) {
File stdout = new File(casesDir, "stdout").getAbsoluteFile();
File stderr = new File(casesDir, "stderr").getAbsoluteFile();
File[] stdoutFiles = stdout.listFiles();
File[] stderrFiles = stderr.listFiles();
Set<String> cases = new HashSet<>();
if (stdoutFiles != null) {
for (File c : stdoutFiles) {
cases.add(c.getName());
}
}
if (stderrFiles != null) {
for (File c : stderrFiles) {
cases.add(c.getName());
}
}
String[] caseNames = cases.stream().sorted((o1, o2) -> {
if (o1.length() == o2.length()) {
return o1.compareTo(o2);
} else {
return o1.length() - o2.length();
}
}).toArray(String[]::new);
for (String caseName : caseNames) {
File stdoutFile = new File(stdout, caseName);
File stderrFile = new File(stderr, caseName);
try {
List<String> stdoutLines = stdoutFile.exists() ?
readLines(stdoutFile) : new ArrayList<>();
List<String> stderrLines = stderrFile.exists() ?
readLines(stderrFile) : new ArrayList<>();
LOG.info("Shell case {} - #{}\nstdout:\n\t{}\nstderr:\n\t{}",
casesDir.getName(), caseName,
String.join("\n\t", stdoutLines),
String.join("\n\t", stderrLines));
} catch (Exception e) {
LOG.warn("Read shell stdout or stderr file failed", e);
}
}
}
private HdfsCompatReport export() throws IOException {
HdfsCompatReport report = new HdfsCompatReport();
report.addPassedCase(readLines(this.passList));
report.addFailedCase(readLines(this.failList));
report.addSkippedCase(readLines(this.skipList));
return report;
}
private List<String> readLines(File file) throws IOException {
List<String> lines = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(
new FileInputStream(file), StandardCharsets.UTF_8))) {
String line = br.readLine();
while (line != null) {
lines.add(line);
line = br.readLine();
}
}
return lines;
}
private static final class StreamPrinter extends Thread {
private final InputStream in;
private final List<String> lines;
private StreamPrinter(InputStream in) {
this.in = in;
this.lines = new ArrayList<>();
}
@Override
public void run() {
try (BufferedReader br = new BufferedReader(
new InputStreamReader(in, StandardCharsets.UTF_8))) {
String line = br.readLine();
while (line != null) {
this.lines.add(line);
line = br.readLine();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static final class ExecResult {
private final int code;
private final List<String> out;
private final List<String> err;
private ExecResult(int code, List<String> out, List<String> err) {
this.code = code;
this.out = out;
this.err = err;
}
}
}