TestRollingFileSystemSinkWithSecureHdfs.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.metrics2.sink;

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.NullGroupsMapping;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.Before;
import org.junit.BeforeClass;
import static org.junit.Assert.assertTrue;

/**
 * Test the {@link RollingFileSystemSink} class in the context of HDFS with
 * Kerberos enabled.
 */
public class TestRollingFileSystemSinkWithSecureHdfs
    extends RollingFileSystemSinkTestBase {
  private static final int  NUM_DATANODES = 4;
  private static MiniKdc kdc;
  private static String sinkPrincipal;
  private static String sinkKeytab;
  private static String hdfsPrincipal;
  private static String hdfsKeytab;
  private static String spnegoPrincipal;
  private MiniDFSCluster cluster = null;
  private UserGroupInformation sink = null;

  /**
   * Setup the KDC for testing a secure HDFS cluster.
   *
   * @throws Exception thrown if the KDC setup fails
   */
  @BeforeClass
  public static void initKdc() throws Exception {
    Properties kdcConf = MiniKdc.createConf();
    kdc = new MiniKdc(kdcConf, ROOT_TEST_DIR);
    kdc.start();

    File sinkKeytabFile = new File(ROOT_TEST_DIR, "sink.keytab");
    sinkKeytab = sinkKeytabFile.getAbsolutePath();
    kdc.createPrincipal(sinkKeytabFile, "sink/localhost");
    sinkPrincipal = "sink/localhost@" + kdc.getRealm();

    File hdfsKeytabFile = new File(ROOT_TEST_DIR, "hdfs.keytab");
    hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
    kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost",
        "HTTP/localhost");
    hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm();
    spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
  }

  /**
   * Setup the mini-DFS cluster.
   *
   * @throws Exception thrown if the cluster setup fails
   */
  @Before
  public void initCluster() throws Exception {
    HdfsConfiguration conf = createSecureConfig("authentication,privacy");

    RollingFileSystemSink.hasFlushed = false;
    RollingFileSystemSink.suppliedConf = conf;

    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
        .build();
    cluster.waitActive();
    createDirectoriesSecurely();
  }

  /**
   * Stop the mini-DFS cluster.
   */
  @After
  public void stopCluster() {
    if (cluster != null) {
      cluster.shutdown();
    }

    // Restore non-secure conf
    UserGroupInformation.setConfiguration(new Configuration());
    RollingFileSystemSink.suppliedConf = null;
    RollingFileSystemSink.suppliedFilesystem = null;
  }

  /**
   * Stop the mini-KDC.
   */
  @AfterClass
  public static void shutdownKdc() {
    if (kdc != null) {
      kdc.stop();
    }
  }

  /**
   * Do a basic write test against an HDFS cluster with Kerberos enabled. We
   * assume that if a basic write succeeds, more complex operations will also
   * succeed.
   *
   * @throws Exception thrown if things break
   */
  @Test
  public void testWithSecureHDFS() throws Exception {
    final String path =
        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
    final MetricsSystem ms =
        initMetricsSystem(path, true, false, true);

    assertMetricsContents(
        sink.doAs(new PrivilegedExceptionAction<String>() {
          @Override
          public String run() throws Exception {
            return doWriteTest(ms, path, 1);
          }
        }));
  }

  /**
   * Do a basic write test against an HDFS cluster with Kerberos enabled but
   * without the principal and keytab properties set.
   *
   * @throws Exception thrown if things break
   */
  @Test
  public void testMissingPropertiesWithSecureHDFS() throws Exception {
    final String path =
        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";

    initMetricsSystem(path, true, false);

    assertTrue("No exception was generated initializing the sink against a "
        + "secure cluster even though the principal and keytab properties "
        + "were missing", MockSink.errored);
  }

  /**
   * Create the /tmp directory as <i>hdfs</i> and /tmp/test as <i>sink</i> and
   * return the UGI for <i>sink</i>.
   *
   * @throws IOException thrown if login or directory creation fails
   * @throws InterruptedException thrown if interrupted while creating a
   * file system handle
   */
  protected void createDirectoriesSecurely()
      throws IOException, InterruptedException {
    Path tmp = new Path("/tmp");
    Path test = new Path(tmp, "test");

    UserGroupInformation hdfs =
        UserGroupInformation.loginUserFromKeytabAndReturnUGI(hdfsPrincipal,
            hdfsKeytab);
    FileSystem fsForSuperUser =
        hdfs.doAs(new PrivilegedExceptionAction<FileSystem>() {
          @Override
          public FileSystem run() throws Exception {
            return cluster.getFileSystem();
          }
        });

    fsForSuperUser.mkdirs(tmp);
    fsForSuperUser.setPermission(tmp, new FsPermission((short)0777));

    sink = UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal,
            sinkKeytab);

    FileSystem fsForSink =
        sink.doAs(new PrivilegedExceptionAction<FileSystem>() {
          @Override
          public FileSystem run() throws Exception {
            return cluster.getFileSystem();
          }
        });

    fsForSink.mkdirs(test);
    RollingFileSystemSink.suppliedFilesystem = fsForSink;
  }

  /**
   * Creates a configuration for starting a secure cluster.
   *
   * @param dataTransferProtection supported QOPs
   * @return configuration for starting a secure cluster
   * @throws Exception if there is any failure
   */
  protected HdfsConfiguration createSecureConfig(
      String dataTransferProtection) throws Exception {
    HdfsConfiguration conf = new HdfsConfiguration();

    SecurityUtil.setAuthenticationMethod(
        UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, hdfsKeytab);
    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, hdfsKeytab);
    conf.set(SINK_PRINCIPAL_KEY, sinkPrincipal);
    conf.set(SINK_KEYTAB_FILE_KEY, sinkKeytab);
    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
    conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_GROUP_MAPPING,
        NullGroupsMapping.class.getName());

    String keystoresDir = methodDir.getAbsolutePath();
    String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());

    KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
    conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
        KeyStoreTestUtil.getClientSSLConfigFileName());
    conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
        KeyStoreTestUtil.getServerSSLConfigFileName());

    return conf;
  }
}