FederationStateStoreTestUtils.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.store;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS;
import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY;
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;

/**
 * Utilities to test the State Store.
 */
public final class FederationStateStoreTestUtils {

  /** The State Store Driver implementation class for testing .*/
  private static final Class<? extends StateStoreDriver>
      FEDERATION_STORE_DRIVER_CLASS_FOR_TEST = StateStoreFileImpl.class;

  private FederationStateStoreTestUtils() {
    // Utility Class
  }

  /**
   * Get the State Store driver implementation for testing.
   *
   * @return Class of the State Store driver implementation.
   */
  public static Class<? extends StateStoreDriver> getTestDriverClass() {
    return FEDERATION_STORE_DRIVER_CLASS_FOR_TEST;
  }

  /**
   * Create a default State Store configuration.
   *
   * @return State Store configuration.
   */
  public static Configuration getStateStoreConfiguration() {
    Class<? extends StateStoreDriver> clazz = getTestDriverClass();
    return getStateStoreConfiguration(clazz);
  }

  /**
   * Create a new State Store configuration for a particular driver.
   *
   * @param clazz Class of the driver to create.
   * @return State Store configuration.
   */
  public static Configuration getStateStoreConfiguration(
      Class<? extends StateStoreDriver> clazz) {
    Configuration conf = new HdfsConfiguration(false);

    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test");

    conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class);

    if (StateStoreFileBaseImpl.class.isAssignableFrom(clazz)) {
      setFileConfiguration(conf);
    }
    return conf;
  }

  /**
   * Create a new State Store based on a configuration.
   *
   * @param configuration Configuration for the State Store.
   * @return New State Store service.
   * @throws IOException If it cannot create the State Store.
   * @throws InterruptedException If we cannot wait for the store to start.
   */
  public static StateStoreService newStateStore(
      Configuration configuration) throws IOException, InterruptedException {

    StateStoreService stateStore = new StateStoreService();
    assertNotNull(stateStore);

    // Set unique identifier, this is normally the router address
    String identifier = UUID.randomUUID().toString();
    stateStore.setIdentifier(identifier);

    stateStore.init(configuration);
    stateStore.start();

    // Wait for state store to connect
    waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10));

    return stateStore;
  }

  /**
   * Wait for the State Store to initialize its driver.
   *
   * @param stateStore State Store.
   * @param timeoutMs Time out in milliseconds.
   * @throws IOException If the State Store cannot be reached.
   * @throws InterruptedException If the sleep is interrupted.
   */
  public static void waitStateStore(StateStoreService stateStore,
      long timeoutMs) throws IOException, InterruptedException {
    long startingTime = Time.monotonicNow();
    while (!stateStore.isDriverReady()) {
      Thread.sleep(100);
      if (Time.monotonicNow() - startingTime > timeoutMs) {
        throw new IOException("Timeout waiting for State Store to connect");
      }
    }
  }

  /**
   * Delete the default State Store.
   *
   * @throws IOException
   */
  public static void deleteStateStore() throws IOException {
    Class<? extends StateStoreDriver> driverClass = getTestDriverClass();
    deleteStateStore(driverClass);
  }

  /**
   * Delete the State Store.
   * @param driverClass Class of the State Store driver implementation.
   * @throws IOException If it cannot be deleted.
   */
  public static void deleteStateStore(
      Class<? extends StateStoreDriver> driverClass) throws IOException {

    if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) {
      String workingDirectory = System.getProperty("user.dir");
      File dir = new File(workingDirectory + "/statestore");
      if (dir.exists()) {
        FileUtils.cleanDirectory(dir);
      }
    }
  }

  /**
   * Set the default configuration for drivers based on files.
   *
   * @param conf Configuration to extend.
   */
  public static void setFileConfiguration(Configuration conf) {
    String stateStorePath = GenericTestUtils.getRandomizedTempPath();
    conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath);
  }

  /**
   * Clear all the records from the State Store.
   *
   * @param store State Store to remove records from.
   * @return If the State Store was cleared.
   * @throws IOException If it cannot clear the State Store.
   */
  public static boolean clearAllRecords(StateStoreService store)
      throws IOException {
    Collection<Class<? extends BaseRecord>> allRecords =
        store.getSupportedRecords();
    for (Class<? extends BaseRecord> recordType : allRecords) {
      if (!clearRecords(store, recordType)) {
        return false;
      }
    }
    return true;
  }

  /**
   * Clear records from a certain type from the State Store.
   *
   * @param store State Store to remove records from.
   * @param recordClass Class of the records to remove.
   * @return If the State Store was cleared.
   * @throws IOException If it cannot clear the State Store.
   */
  public static <T extends BaseRecord> boolean clearRecords(
      StateStoreService store, Class<T> recordClass) throws IOException {
    List<T> emptyList = new ArrayList<>();
    if (!synchronizeRecords(store, emptyList, recordClass)) {
      return false;
    }
    store.refreshCaches(true);
    return true;
  }

  /**
   * Synchronize a set of records. Remove all and keep the ones specified.
   *
   * @param stateStore State Store service managing the driver.
   * @param records Records to add.
   * @param clazz Class of the record to synchronize.
   * @return If the synchronization succeeded.
   * @throws IOException If it cannot connect to the State Store.
   */
  public static <T extends BaseRecord> boolean synchronizeRecords(
      StateStoreService stateStore, List<T> records, Class<T> clazz)
          throws IOException {
    StateStoreDriver driver = stateStore.getDriver();
    driver.verifyDriverReady();
    if (driver.removeAll(clazz)) {
      return driver.putAll(records, true, false).isOperationSuccessful();
    }
    return false;
  }

  public static List<MountTable> createMockMountTable(
      List<String> nameservices) throws IOException {
    // create table entries
    List<MountTable> entries = new ArrayList<>();
    for (String ns : nameservices) {
      Map<String, String> destMap = new HashMap<>();
      destMap.put(ns, "/target-" + ns);
      MountTable entry = MountTable.newInstance("/" + ns, destMap);
      entries.add(entry);
    }
    return entries;
  }

  public static MembershipState createMockRegistrationForNamenode(
      String nameserviceId, String namenodeId,
      FederationNamenodeServiceState state) throws IOException {
    MembershipState entry = MembershipState.newInstance(
        "routerId", nameserviceId, namenodeId, "clusterId", "test",
        "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", "http", "0.0.0.0:0",
        state, false);
    MembershipStats stats = MembershipStats.newInstance();
    stats.setNumOfActiveDatanodes(100);
    stats.setNumOfDeadDatanodes(10);
    stats.setNumOfDecommissioningDatanodes(20);
    stats.setNumOfDecomActiveDatanodes(15);
    stats.setNumOfDecomDeadDatanodes(5);
    stats.setNumOfBlocks(10);
    stats.setPendingSPSPaths(10);
    entry.setStats(stats);
    return entry;
  }
}