StateStoreDFSCluster.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation;

import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.util.StringUtils;

/**
 * Test utility to mimic a federated HDFS cluster with a router and a state
 * store.
 */
public class StateStoreDFSCluster extends MiniRouterDFSCluster {

  private static final Class<?> DEFAULT_FILE_RESOLVER =
      MountTableResolver.class;
  private static final Class<?> DEFAULT_NAMENODE_RESOLVER =
      MembershipNamenodeResolver.class;

  public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
      long heartbeatInterval, long cacheFlushInterval)
          throws IOException, InterruptedException {
    this(ha, numNameservices, numNamenodes, heartbeatInterval,
        cacheFlushInterval, DEFAULT_FILE_RESOLVER);
  }

  public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
      long heartbeatInterval, long cacheFlushInterval, Class<?> fileResolver)
          throws IOException, InterruptedException {
    super(ha, numNameservices, numNamenodes, heartbeatInterval,
        cacheFlushInterval);

    // Attach state store and resolvers to router
    Configuration stateStoreConfig = getStateStoreConfiguration();
    // Use state store backed resolvers
    stateStoreConfig.setClass(
        RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
        DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class);
    stateStoreConfig.setClass(
        RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
        fileResolver, FileSubclusterResolver.class);
    this.addRouterOverrides(stateStoreConfig);
  }

  public StateStoreDFSCluster(boolean ha, int numNameservices,
      Class<?> fileResolver) throws IOException, InterruptedException {
    this(ha, numNameservices, 2,
        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, fileResolver);
  }

  public StateStoreDFSCluster(boolean ha, int numNameservices)
      throws IOException, InterruptedException {
    this(ha, numNameservices, 2,
        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
  }

  public StateStoreDFSCluster(boolean ha, int numNameservices,
      int numNamnodes) throws IOException, InterruptedException {
    this(ha, numNameservices, numNamnodes,
        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
  }

  /////////////////////////////////////////////////////////////////////////////
  // State Store Test Fixtures
  /////////////////////////////////////////////////////////////////////////////

  /**
   * Adds test fixtures for NN registation for each NN nameservice -> NS
   * namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state ->
   * STANDBY safeMode -> false blockPool -> test.
   *
   * @param stateStore State Store.
   * @throws IOException If it cannot register.
   */
  public void createTestRegistration(StateStoreService stateStore)
      throws IOException {
    List<MembershipState> entries = new ArrayList<MembershipState>();
    for (NamenodeContext nn : this.getNamenodes()) {
      MembershipState entry = createMockRegistrationForNamenode(
          nn.getNameserviceId(), nn.getNamenodeId(),
          FederationNamenodeServiceState.STANDBY);
      entries.add(entry);
    }
    synchronizeRecords(
        stateStore, entries, MembershipState.class);
  }

  public void createTestMountTable(StateStoreService stateStore)
      throws IOException {
    List<MountTable> mounts = generateMockMountTable();
    synchronizeRecords(stateStore, mounts, MountTable.class);
    stateStore.refreshCaches();
  }

  public List<MountTable> generateMockMountTable() throws IOException {
    // create table entries
    List<MountTable> entries = new ArrayList<>();
    for (String ns : this.getNameservices()) {
      Map<String, String> destMap = new HashMap<>();
      destMap.put(ns, getNamenodePathForNS(ns));

      // Direct path
      String fedPath = getFederatedPathForNS(ns);
      MountTable entry = MountTable.newInstance(fedPath, destMap);
      entries.add(entry);
    }

    // Root path goes to nameservice 1
    Map<String, String> destMap = new HashMap<>();
    String ns0 = this.getNameservices().get(0);
    destMap.put(ns0, "/");
    MountTable entry = MountTable.newInstance("/", destMap);
    entries.add(entry);
    return entries;
  }

  /**
   * Get the client configuration which targets all the Routers. It uses the HA
   * setup to fails over between them.
   * @return Configuration for the client which uses two routers.
   */
  public Configuration getRouterClientConf() {
    List<RouterContext> routers = getRouters();
    Configuration clientConf = DFSTestUtil.newHAConfiguration("fed");
    int i = 0;
    List<String> names = new ArrayList<>(routers.size());
    for (RouterContext routerContext : routers) {
      String name = "r" + i++;
      clientConf.set(
          DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".fed." + name,
          "localhost:" + routerContext.getRpcPort());
      names.add(name);
    }
    clientConf.set(DFSUtil.addKeySuffixes(
        HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, "fed"),
        StringUtils.join(",", names));
    return clientConf;
  }
}