HdfsCompatMiniCluster.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.compat.hdfs;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;


public class HdfsCompatMiniCluster {
  private static final Logger LOG =
      LoggerFactory.getLogger(HdfsCompatMiniCluster.class);

  private MiniDFSCluster cluster = null;

  public HdfsCompatMiniCluster() {
  }

  public synchronized void start() throws IOException {
    FileSystem.enableSymlinks();
    Configuration conf = new Configuration();
    conf.set(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, "true");
    conf.set(DFSConfigKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH,
        "kms://http@localhost:9600/kms/foo");
    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, "external");
    conf.set(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, "true");
    conf.set("fs.hdfs.compatibility.privileged.user",
        UserGroupInformation.getCurrentUser().getShortUserName());
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    cluster.waitClusterUp();
  }

  public synchronized void shutdown() {
    if (cluster != null) {
      cluster.shutdown(true);
      cluster = null;
    }
  }

  public synchronized Configuration getConf() throws IOException {
    if (cluster == null) {
      throw new IOException("Cluster not running");
    }
    return cluster.getFileSystem().getConf();
  }

  public synchronized URI getUri() throws IOException {
    if (cluster == null) {
      throw new IOException("Cluster not running");
    }
    return cluster.getFileSystem().getUri();
  }

  public static void main(String[] args)
      throws IOException, InterruptedException {
    long duration = 5L * 60L * 1000L;
    if ((args != null) && (args.length > 0)) {
      duration = Long.parseLong(args[0]);
    }

    HdfsCompatMiniCluster cluster = new HdfsCompatMiniCluster();
    try {
      cluster.start();
      Configuration conf = cluster.getConf();

      final String confDir = System.getenv("HADOOP_CONF_DIR");
      final File confFile = new File(confDir, "core-site.xml");
      try (OutputStream out = new FileOutputStream(confFile)) {
        conf.writeXml(out);
      }

      final long endTime = System.currentTimeMillis() + duration;
      long sleepTime = getSleepTime(endTime);
      while (sleepTime > 0) {
        LOG.warn("Service running ...");
        Thread.sleep(sleepTime);
        sleepTime = getSleepTime(endTime);
      }
    } finally {
      cluster.shutdown();
    }
  }

  private static long getSleepTime(long endTime) {
    long maxTime = endTime - System.currentTimeMillis();
    return (maxTime < 5000) ? maxTime : 5000;
  }
}