HdfsCompatEnvironment.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.compat.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
public class HdfsCompatEnvironment {
private static final Logger LOG =
LoggerFactory.getLogger(HdfsCompatEnvironment.class);
private static final String DATE_FORMAT = "yyyy_MM_dd_HH_mm_ss";
private static final Random RANDOM = new Random();
private final Path uri;
private final Configuration conf;
private FileSystem fs;
private LocalFileSystem localFs;
private Path rootDir;
private Path baseDir;
private String defaultLocalDir;
private String[] defaultStoragePolicyNames;
public HdfsCompatEnvironment(Path uri, Configuration conf) {
this.conf = conf;
this.uri = uri;
}
public void init() throws IOException {
Date now = new Date();
String uuid = UUID.randomUUID().toString();
String uniqueDir = "hadoop-compatibility-benchmark/" +
new SimpleDateFormat(DATE_FORMAT).format(now) + "/" + uuid;
this.fs = uri.getFileSystem(conf);
this.localFs = FileSystem.getLocal(conf);
this.rootDir = fs.makeQualified(new Path("/"));
this.baseDir = fs.makeQualified(new Path(uri, uniqueDir));
String tmpdir = getEnvTmpDir();
if ((tmpdir == null) || tmpdir.isEmpty()) {
LOG.warn("Cannot get valid io.tmpdir, will use /tmp");
tmpdir = "/tmp";
}
this.defaultLocalDir = new File(tmpdir, uniqueDir).getAbsolutePath();
this.defaultStoragePolicyNames = getDefaultStoragePolicyNames();
}
public FileSystem getFileSystem() {
return fs;
}
public LocalFileSystem getLocalFileSystem() {
return localFs;
}
public Path getRoot() {
return rootDir;
}
public Path getBase() {
return baseDir;
}
public String getLocalTmpDir() {
final String scheme = this.uri.toUri().getScheme();
final String key = "fs." + scheme + ".compatibility.local.tmpdir";
final String localDir = conf.get(key, null);
return (localDir != null) ? localDir : defaultLocalDir;
}
public String getPrivilegedUser() {
final String scheme = this.uri.toUri().getScheme();
final String key = "fs." + scheme + ".compatibility.privileged.user";
final String privileged = conf.get(key, null);
return (privileged != null) ? privileged :
conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
}
public String[] getStoragePolicyNames() {
final String scheme = this.uri.toUri().getScheme();
final String key = "fs." + scheme + ".compatibility.storage.policies";
final String storagePolicies = conf.get(key, null);
return (storagePolicies != null) ? storagePolicies.split(",") :
defaultStoragePolicyNames.clone();
}
public String getDelegationTokenRenewer() {
final String scheme = this.uri.toUri().getScheme();
final String key = "fs." + scheme + ".compatibility.delegation.token.renewer";
return conf.get(key, "");
}
private String getEnvTmpDir() {
final String systemDefault = System.getProperty("java.io.tmpdir");
if ((systemDefault == null) || systemDefault.isEmpty()) {
return null;
}
String[] tmpDirs = systemDefault.split(",|" + File.pathSeparator);
List<String> validDirs = Arrays.stream(tmpDirs).filter(
s -> (s != null && !s.isEmpty())
).collect(Collectors.toList());
if (validDirs.isEmpty()) {
return null;
}
final String tmpDir = validDirs.get(
RANDOM.nextInt(validDirs.size()));
return new File(tmpDir).getAbsolutePath();
}
private String[] getDefaultStoragePolicyNames() {
Collection<? extends BlockStoragePolicySpi> policies = null;
try {
policies = fs.getAllStoragePolicies();
} catch (Exception e) {
LOG.warn("Cannot get storage policy", e);
}
if ((policies == null) || policies.isEmpty()) {
return new String[]{"Hot"};
} else {
return policies.stream().map(BlockStoragePolicySpi::getName).toArray(String[]::new);
}
}
}