FederationStateStoreFacade.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.federation.utils;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Random;
import java.util.Collection;

import javax.cache.integration.CacheLoaderException;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.cache.FederationCache;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;

import static org.apache.hadoop.yarn.server.federation.cache.FederationCache.buildPolicyConfigMap;
import static org.apache.hadoop.yarn.server.federation.cache.FederationCache.buildSubClusterInfoMap;

/**
 *
 * The FederationStateStoreFacade is an utility wrapper that provides singleton
 * access to the Federation state store. It abstracts out retries and in
 * addition, it also implements the caching for various objects.
 *
 */
public final class FederationStateStoreFacade {
  private static final Logger LOG =
      LoggerFactory.getLogger(FederationStateStoreFacade.class);

  private static volatile FederationStateStoreFacade facade;

  private static Random rand = new Random(System.currentTimeMillis());

  private FederationStateStore stateStore;
  private Configuration conf;
  private SubClusterResolver subclusterResolver;
  private FederationCache federationCache;

  private FederationStateStoreFacade(Configuration conf) {
    initializeFacadeInternal(conf);
  }

  private void initializeFacadeInternal(Configuration config) {
    this.conf = config;
    try {
      this.stateStore = (FederationStateStore) createRetryInstance(this.conf,
          YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
          YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
          FederationStateStore.class, createRetryPolicy(conf));
      this.stateStore.init(conf);

      this.subclusterResolver = createInstance(conf,
          YarnConfiguration.FEDERATION_CLUSTER_RESOLVER_CLASS,
          YarnConfiguration.DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS,
          SubClusterResolver.class);
      this.subclusterResolver.load();

      // We check the configuration of Cache,
      // if the configuration is null, set it to FederationJCache
      this.federationCache = createInstance(conf,
          YarnConfiguration.FEDERATION_FACADE_CACHE_CLASS,
          YarnConfiguration.DEFAULT_FEDERATION_FACADE_CACHE_CLASS,
          FederationCache.class);
      this.federationCache.initCache(config, stateStore);

    } catch (YarnException ex) {
      LOG.error("Failed to initialize the FederationStateStoreFacade object", ex);
      throw new RuntimeException(ex);
    }
  }

  /**
   * Delete and re-initialize the cache, to force it to use the given
   * configuration.
   *
   * @param store the {@link FederationStateStore} instance to reinitialize with
   * @param config the updated configuration to reinitialize with
   */
  @VisibleForTesting
  public synchronized void reinitialize(FederationStateStore store,
      Configuration config) {
    this.conf = config;
    this.stateStore = store;
    federationCache.clearCache();
    federationCache.initCache(config, stateStore);
  }

  /**
   * Create a RetryPolicy for {@code FederationStateStoreFacade}. In case of
   * failure, it retries for:
   * <ul>
   * <li>{@code FederationStateStoreRetriableException}</li>
   * <li>{@code CacheLoaderException}</li>
   * </ul>
   *
   * @param conf the updated configuration
   * @return the RetryPolicy for FederationStateStoreFacade
   */
  public static RetryPolicy createRetryPolicy(Configuration conf) {
    // Retry settings for StateStore
    RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
        conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE),
        conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
        TimeUnit.MILLISECONDS);
    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<>();
    exceptionToPolicyMap.put(FederationStateStoreRetriableException.class,
        basePolicy);
    exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy);
    exceptionToPolicyMap.put(PoolInitializationException.class, basePolicy);

    RetryPolicy retryPolicy = RetryPolicies.retryByException(
        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
    return retryPolicy;
  }

  /**
   * Returns the singleton instance of the FederationStateStoreFacade object.
   *
   * @return the singleton {@link FederationStateStoreFacade} instance
   */
  public static FederationStateStoreFacade getInstance() {
    return getInstanceInternal(new Configuration());
  }

  /**
   * Returns the singleton instance of the FederationStateStoreFacade object.
   *
   * @param conf configuration.
   * @return the singleton {@link FederationStateStoreFacade} instance
   */
  public static FederationStateStoreFacade getInstance(Configuration conf) {
    return getInstanceInternal(conf);
  }

  /**
   * Returns the singleton instance of the FederationStateStoreFacade object.
   *
   * @param conf configuration.
   * @return the singleton {@link FederationStateStoreFacade} instance
   */
  private static FederationStateStoreFacade getInstanceInternal(Configuration conf){
    if (facade != null) {
      return facade;
    }
    generateStateStoreFacade(conf);
    return facade;
  }

  /**
   * Generate the singleton instance of the FederationStateStoreFacade object.
   *
   * @param conf configuration.
   */
  private static void generateStateStoreFacade(Configuration conf){
    if (facade == null) {
      synchronized (FederationStateStoreFacade.class) {
        if (facade == null) {
          Configuration yarnConf = new Configuration();
          if (conf != null) {
            yarnConf = conf;
          }
          facade = new FederationStateStoreFacade(yarnConf);
        }
      }
    }
  }

  /**
   * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
   *
   * @param subClusterId the identifier of the sub-cluster
   * @return the sub cluster information, or
   *         {@code null} if there is no mapping for the subClusterId
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
      throws YarnException {
    if (federationCache.isCachingEnabled()) {
      return getSubClusters(false).get(subClusterId);
    } else {
      GetSubClusterInfoResponse response = stateStore
          .getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId));
      if (response == null) {
        return null;
      } else {
        return response.getSubClusterInfo();
      }
    }
  }

  /**
   * Updates the cache with the central {@link FederationStateStore} and returns
   * the {@link SubClusterInfo} for the specified {@link SubClusterId}.
   *
   * @param subClusterId the identifier of the sub-cluster
   * @param flushCache flag to indicate if the cache should be flushed or not
   * @return the sub cluster information
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
      final boolean flushCache) throws YarnException {
    if (flushCache && federationCache.isCachingEnabled()) {
      LOG.info("Flushing subClusters from cache and rehydrating from store,"
          + " most likely on account of RM failover.");
      federationCache.removeSubCluster(false);
    }
    return getSubCluster(subClusterId);
  }

  /**
   * Returns the {@link SubClusterInfo} of all active sub cluster(s).
   *
   * @param filterInactiveSubClusters whether to filter out inactive
   *          sub-clusters
   * @return the information of all active sub cluster(s)
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public Map<SubClusterId, SubClusterInfo> getSubClusters(final boolean filterInactiveSubClusters)
      throws YarnException {
    try {
      if (federationCache.isCachingEnabled()) {
        return federationCache.getSubClusters(filterInactiveSubClusters);
      } else {
        GetSubClustersInfoRequest request =
            GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters);
        return buildSubClusterInfoMap(stateStore.getSubClusters(request));
      }
    } catch (Throwable ex) {
      throw new YarnException(ex);
    }
  }

  /**
   * Updates the cache with the central {@link FederationStateStore} and returns
   * the {@link SubClusterInfo} of all active sub cluster(s).
   *
   * @param filterInactiveSubClusters whether to filter out inactive
   *          sub-clusters
   * @param flushCache flag to indicate if the cache should be flushed or not
   * @return the sub cluster information
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public Map<SubClusterId, SubClusterInfo> getSubClusters(
      final boolean filterInactiveSubClusters, final boolean flushCache)
      throws YarnException {
    if (flushCache && federationCache.isCachingEnabled()) {
      LOG.info("Flushing subClusters from cache and rehydrating from store.");
      federationCache.removeSubCluster(flushCache);
    }
    return getSubClusters(filterInactiveSubClusters);
  }

  /**
   * Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
   *
   * @param queue the queue whose policy is required
   * @return the corresponding configured policy, or {@code null} if there is no
   *         mapping for the queue
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public SubClusterPolicyConfiguration getPolicyConfiguration(final String queue)
      throws YarnException {
    if (federationCache.isCachingEnabled()) {
      return getPoliciesConfigurations().get(queue);
    } else {
      GetSubClusterPolicyConfigurationRequest request =
          GetSubClusterPolicyConfigurationRequest.newInstance(queue);
      GetSubClusterPolicyConfigurationResponse response =
          stateStore.getPolicyConfiguration(request);
      if (response == null) {
        return null;
      } else {
        return response.getPolicyConfiguration();
      }
    }
  }

  /**
   * Set a policy configuration into the state store.
   *
   * @param policyConf the policy configuration to set
   * @throws YarnException if the request is invalid/fails
   */
  public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
      throws YarnException {
    stateStore.setPolicyConfiguration(
        SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
  }

  /**
   * Get the policies that is represented as
   * {@link SubClusterPolicyConfiguration} for all currently active queues in
   * the system.
   *
   * @return the policies for all currently active queues in the system
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
      throws YarnException {
    try {
      if (federationCache.isCachingEnabled()) {
        return federationCache.getPoliciesConfigurations();
      } else {
        GetSubClusterPoliciesConfigurationsRequest request =
            GetSubClusterPoliciesConfigurationsRequest.newInstance();
        return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(request));
      }
    } catch (Throwable ex) {
      throw new YarnException(ex);
    }
  }

  /**
   * Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
   *
   * @param appHomeSubCluster the mapping of the application to it's home
   *          sub-cluster
   * @return the stored Subcluster from StateStore
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public SubClusterId addApplicationHomeSubCluster(
      ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
    AddApplicationHomeSubClusterResponse response =
        stateStore.addApplicationHomeSubCluster(
            AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
    return response.getHomeSubCluster();
  }

  /**
   * Updates the home {@link SubClusterId} for the specified
   * {@link ApplicationId}.
   *
   * @param appHomeSubCluster the mapping of the application to it's home
   *          sub-cluster
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public void updateApplicationHomeSubCluster(
      ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
    stateStore.updateApplicationHomeSubCluster(
        UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
  }

  /**
   * Returns the home {@link SubClusterId} for the specified
   * {@link ApplicationId}.
   *
   * @param appId the identifier of the application
   * @return the home sub cluster identifier
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
      throws YarnException {
    try {
      if (federationCache.isCachingEnabled()) {
        return federationCache.getApplicationHomeSubCluster(appId);
      } else {
        GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
            GetApplicationHomeSubClusterRequest.newInstance(appId));
        return response.getApplicationHomeSubCluster().getHomeSubCluster();
      }
    } catch (Throwable ex) {
      throw new YarnException(ex);
    }
  }

  /**
   * Get the singleton instance of SubClusterResolver.
   *
   * @return SubClusterResolver instance
   */
  public SubClusterResolver getSubClusterResolver() {
    return this.subclusterResolver;
  }

  /**
   * Get the configuration.
   *
   * @return configuration object
   */
  public Configuration getConf() {
    return this.conf;
  }

  /**
   * Adds the home {@link SubClusterId} for the specified {@link ReservationId}.
   *
   * @param appHomeSubCluster the mapping of the reservation to it's home
   *          sub-cluster
   * @return the stored subCluster from StateStore
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public SubClusterId addReservationHomeSubCluster(ReservationHomeSubCluster appHomeSubCluster)
      throws YarnException {
    AddReservationHomeSubClusterResponse response = stateStore.addReservationHomeSubCluster(
        AddReservationHomeSubClusterRequest.newInstance(appHomeSubCluster));
    return response.getHomeSubCluster();
  }

  /**
   * Returns the home {@link SubClusterId} for the specified {@link ReservationId}.
   *
   * @param reservationId the identifier of the reservation
   * @return the home subCluster identifier
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public SubClusterId getReservationHomeSubCluster(ReservationId reservationId)
      throws YarnException {
    GetReservationHomeSubClusterResponse response = stateStore.getReservationHomeSubCluster(
         GetReservationHomeSubClusterRequest.newInstance(reservationId));
    return response.getReservationHomeSubCluster().getHomeSubCluster();
  }

  /**
   * Updates the home {@link SubClusterId} for the specified
   * {@link ReservationId}.
   *
   * @param appHomeSubCluster the mapping of the reservation to it's home
   *          sub-cluster
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public void updateReservationHomeSubCluster(ReservationHomeSubCluster appHomeSubCluster)
      throws YarnException {
    UpdateReservationHomeSubClusterRequest request =
        UpdateReservationHomeSubClusterRequest.newInstance(appHomeSubCluster);
    stateStore.updateReservationHomeSubCluster(request);
  }

  /**
   * Delete the home {@link SubClusterId} for the specified
   * {@link ReservationId}.
   *
   * @param reservationId the identifier of the reservation
   * @throws YarnException if the call to the state store is unsuccessful
   */
  public void deleteReservationHomeSubCluster(ReservationId reservationId) throws YarnException {
    DeleteReservationHomeSubClusterRequest request =
        DeleteReservationHomeSubClusterRequest.newInstance(reservationId);
    stateStore.deleteReservationHomeSubCluster(request);
  }

  /**
   * Helper method to create instances of Object using the class name defined in
   * the configuration object. The instances creates {@link RetryProxy} using
   * the specific {@link RetryPolicy}.
   *
   * @param conf the yarn configuration
   * @param configuredClassName the configuration provider key
   * @param defaultValue the default implementation for fallback
   * @param type the class for which a retry proxy is required
   * @param retryPolicy the policy for retrying method call failures
   * @param <T> The type of the instance.
   * @return a retry proxy for the specified interface
   */
  public static <T> Object createRetryInstance(Configuration conf,
      String configuredClassName, String defaultValue, Class<T> type,
      RetryPolicy retryPolicy) {

    return RetryProxy.create(type,
        createInstance(conf, configuredClassName, defaultValue, type),
        retryPolicy);
  }

  /**
   * Helper method to create instances of Object using the class name specified
   * in the configuration object.
   *
   * @param conf the yarn configuration
   * @param configuredClassName the configuration provider key
   * @param defaultValue the default implementation class
   * @param type the required interface/base class
   * @param <T> The type of the instance to create
   * @return the instances created
   */
  @SuppressWarnings("unchecked")
  public static <T> T createInstance(Configuration conf,
      String configuredClassName, String defaultValue, Class<T> type) {

    String className = conf.get(configuredClassName, defaultValue);
    try {
      Class<?> clusterResolverClass = conf.getClassByName(className);
      if (type.isAssignableFrom(clusterResolverClass)) {
        return (T) ReflectionUtils.newInstance(clusterResolverClass, conf);
      } else {
        throw new YarnRuntimeException("Class: " + className
            + " not instance of " + type.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate : " + className, e);
    }
  }

  @VisibleForTesting
  public FederationStateStore getStateStore() {
    return stateStore;
  }

  /**
   * The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
   *
   * @param newKey Key used for generating and verifying delegation tokens
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   * @return RouterMasterKeyResponse
   */
  public RouterMasterKeyResponse storeNewMasterKey(DelegationKey newKey)
      throws YarnException, IOException {
    LOG.info("Storing master key with keyID {}.", newKey.getKeyId());
    ByteBuffer keyBytes = ByteBuffer.wrap(newKey.getEncodedKey());
    RouterMasterKey masterKey = RouterMasterKey.newInstance(newKey.getKeyId(),
        keyBytes, newKey.getExpiryDate());
    RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey);
    return stateStore.storeNewMasterKey(keyRequest);
  }

  /**
   * The Router Supports Remove MasterKey (RouterMasterKey{@link RouterMasterKey}).
   *
   * @param newKey Key used for generating and verifying delegation tokens
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   */
  public void removeStoredMasterKey(DelegationKey newKey) throws YarnException, IOException {
    LOG.info("Removing master key with keyID {}.", newKey.getKeyId());
    ByteBuffer keyBytes = ByteBuffer.wrap(newKey.getEncodedKey());
    RouterMasterKey masterKey = RouterMasterKey.newInstance(newKey.getKeyId(),
        keyBytes, newKey.getExpiryDate());
    RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey);
    stateStore.removeStoredMasterKey(keyRequest);
  }

  /**
   * The Router Supports GetMasterKeyByDelegationKey.
   *
   * @param newKey Key used for generating and verifying delegation tokens
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   * @return RouterMasterKeyResponse
   */
  public RouterMasterKeyResponse getMasterKeyByDelegationKey(DelegationKey newKey)
      throws YarnException, IOException {
    LOG.info("Storing master key with keyID {}.", newKey.getKeyId());
    ByteBuffer keyBytes = ByteBuffer.wrap(newKey.getEncodedKey());
    RouterMasterKey masterKey = RouterMasterKey.newInstance(newKey.getKeyId(),
        keyBytes, newKey.getExpiryDate());
    RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey);
    return stateStore.getMasterKeyByDelegationKey(keyRequest);
  }

  /**
   * The Router Supports Store RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
   *
   * @param identifier delegation tokens from the RM
   * @param renewDate renewDate
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   */
  public void storeNewToken(RMDelegationTokenIdentifier identifier,
      long renewDate) throws YarnException, IOException {
    LOG.info("storing RMDelegation token with sequence number: {}.",
        identifier.getSequenceNumber());
    RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
    RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
    stateStore.storeNewToken(request);
  }

  /**
   * The Router Supports Store RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
   *
   * @param identifier delegation tokens from the RM.
   * @param renewDate renewDate.
   * @param tokenInfo tokenInfo.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  public void storeNewToken(RMDelegationTokenIdentifier identifier,
      long renewDate, String tokenInfo) throws YarnException, IOException {
    LOG.info("storing RMDelegation token with sequence number: {}.",
        identifier.getSequenceNumber());
    RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo);
    RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
    stateStore.storeNewToken(request);
  }

  /**
   * The Router Supports Update RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
   *
   * @param identifier delegation tokens from the RM
   * @param renewDate renewDate
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   */
  public void updateStoredToken(RMDelegationTokenIdentifier identifier,
      long renewDate) throws YarnException, IOException {
    LOG.info("updating RMDelegation token with sequence number: {}.",
        identifier.getSequenceNumber());
    RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
    RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
    stateStore.updateStoredToken(request);
  }

  /**
   * The Router Supports Update RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
   *
   * @param identifier delegation tokens from the RM
   * @param renewDate renewDate
   * @param tokenInfo tokenInfo.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  public void updateStoredToken(RMDelegationTokenIdentifier identifier,
      long renewDate, String tokenInfo) throws YarnException, IOException {
    LOG.info("updating RMDelegation token with sequence number: {}.",
        identifier.getSequenceNumber());
    RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo);
    RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
    stateStore.updateStoredToken(request);
  }

  /**
   * The Router Supports Remove RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
   *
   * @param identifier delegation tokens from the RM
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   */
  public void removeStoredToken(RMDelegationTokenIdentifier identifier)
      throws YarnException, IOException{
    LOG.info("removing RMDelegation token with sequence number: {}.",
        identifier.getSequenceNumber());
    RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L);
    RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
    stateStore.removeStoredToken(request);
  }

  /**
   * The Router Supports GetTokenByRouterStoreToken{@link RMDelegationTokenIdentifier}.
   *
   * @param identifier delegation tokens from the RM
   * @return RouterStoreToken
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   */
  public RouterRMTokenResponse getTokenByRouterStoreToken(RMDelegationTokenIdentifier identifier)
      throws YarnException, IOException {
    LOG.info("get RouterStoreToken token with sequence number: {}.",
        identifier.getSequenceNumber());
    RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L);
    RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
    return stateStore.getTokenByRouterStoreToken(request);
  }

  /**
   * stateStore provides DelegationTokenSeqNum increase.
   *
   * @return delegationTokenSequenceNumber.
   */
  public int incrementDelegationTokenSeqNum() {
    return stateStore.incrementDelegationTokenSeqNum();
  }

  /**
   * Get SeqNum from stateStore.
   *
   * @return delegationTokenSequenceNumber.
   */
  public int getDelegationTokenSeqNum() {
    return stateStore.getDelegationTokenSeqNum();
  }

  /**
   * Set SeqNum from stateStore.
   *
   * @param seqNum delegationTokenSequenceNumber.
   */
  public void setDelegationTokenSeqNum(int seqNum) {
    stateStore.setDelegationTokenSeqNum(seqNum);
  }

  /**
   * Get CurrentKeyId from stateStore.
   *
   * @return currentKeyId.
   */
  public int getCurrentKeyId() {
    return stateStore.getCurrentKeyId();
  }

  /**
   * stateStore provides CurrentKeyId increase.
   *
   * @return currentKeyId.
   */
  public int incrementCurrentKeyId() {
    return stateStore.incrementCurrentKeyId();
  }

  /**
   * Get the number of active cluster nodes.
   *
   * @return number of active cluster nodes.
   * @throws YarnException if the call to the state store is unsuccessful.
   */
  public int getActiveSubClustersCount() throws YarnException {
    Map<SubClusterId, SubClusterInfo> activeSubClusters = getSubClusters(true);
    if (activeSubClusters == null || activeSubClusters.isEmpty()) {
      return 0;
    } else {
      return activeSubClusters.size();
    }
  }

  /**
   * Randomly pick ActiveSubCluster.
   * During the selection process, we will exclude SubClusters from the blacklist.
   *
   * @param activeSubClusters List of active subClusters.
   * @param blackList blacklist.
   * @return Active SubClusterId.
   * @throws YarnException When there is no Active SubCluster,
   * an exception will be thrown (No active SubCluster available to submit the request.)
   */
  public static SubClusterId getRandomActiveSubCluster(
      Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
      throws YarnException {

    // Check if activeSubClusters is empty, if it is empty, we need to throw an exception
    if (MapUtils.isEmpty(activeSubClusters)) {
      throw new FederationPolicyException(
          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
    }

    // Change activeSubClusters to List
    List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());

    // If the blacklist is not empty, we need to remove all the subClusters in the blacklist
    if (CollectionUtils.isNotEmpty(blackList)) {
      subClusterIds.removeAll(blackList);
    }

    // Check there are still active subcluster after removing the blacklist
    if (CollectionUtils.isEmpty(subClusterIds)) {
      throw new FederationPolicyException(
          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
    }

    // Randomly choose a SubCluster
    return subClusterIds.get(rand.nextInt(subClusterIds.size()));
  }

  /**
   * Get the number of retries.
   *
   * @param configRetries User-configured number of retries.
   * @return number of retries.
   * @throws YarnException yarn exception.
   */
  public int getRetryNumbers(int configRetries) throws YarnException {
    int activeSubClustersCount = getActiveSubClustersCount();
    int actualRetryNums = Math.min(activeSubClustersCount, configRetries);
    // Normally, we don't set a negative number for the number of retries,
    // but if the user sets a negative number for the number of retries,
    // we will return 0
    if (actualRetryNums < 0) {
      return 0;
    }
    return actualRetryNums;
  }

  /**
   * Query SubClusterId By applicationId.
   *
   * If SubClusterId is not empty, it means it exists and returns true;
   * if SubClusterId is empty, it means it does not exist and returns false.
   *
   * @param applicationId applicationId
   * @return true, SubClusterId exists; false, SubClusterId not exists.
   */
  public boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
    try {
      SubClusterId subClusterId = getApplicationHomeSubCluster(applicationId);
      if (subClusterId != null) {
        return true;
      }
    } catch (YarnException e) {
      LOG.debug("get homeSubCluster by applicationId = {} error.", applicationId, e);
    }
    return false;
  }

  /**
   * Add ApplicationHomeSubCluster to FederationStateStore.
   *
   * @param applicationId applicationId.
   * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
   * @throws YarnException yarn exception.
   */
  public void addApplicationHomeSubCluster(ApplicationId applicationId,
      ApplicationHomeSubCluster homeSubCluster) throws YarnException {
    try {
      addApplicationHomeSubCluster(homeSubCluster);
    } catch (YarnException e) {
      String msg = String.format(
          "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
      throw new YarnException(msg, e);
    }
  }

  /**
   * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
   * all submitted applications to it's home sub-cluster.
   *
   * @return the mapping of all submitted application to it's home sub-cluster
   * @throws YarnException if the request is invalid/fails
   */
  public List<ApplicationHomeSubCluster> getApplicationsHomeSubCluster() throws YarnException {
    GetApplicationsHomeSubClusterResponse response = stateStore.getApplicationsHomeSubCluster(
        GetApplicationsHomeSubClusterRequest.newInstance());
    return response.getAppsHomeSubClusters();
  }

  /**
   * Delete the mapping of home {@code SubClusterId} of a previously submitted
   * {@code ApplicationId}. Currently response is empty if the operation is
   * successful, if not an exception reporting reason for a failure.
   *
   * @param applicationId the application to delete the home sub-cluster of
   * @throws YarnException if the request is invalid/fails
   */
  public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
      throws YarnException {
    stateStore.deleteApplicationHomeSubCluster(
        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
  }

  /**
   * Update ApplicationHomeSubCluster to FederationStateStore.
   *
   * @param subClusterId homeSubClusterId
   * @param applicationId applicationId.
   * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
   * @throws YarnException yarn exception.
   */
  public void updateApplicationHomeSubCluster(SubClusterId subClusterId,
      ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
    try {
      updateApplicationHomeSubCluster(homeSubCluster);
    } catch (YarnException e) {
      SubClusterId subClusterIdInStateStore = getApplicationHomeSubCluster(applicationId);
      if (subClusterId == subClusterIdInStateStore) {
        LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId);
      } else {
        String msg = String.format(
            "Unable to update the ApplicationId %s into the FederationStateStore.", applicationId);
        throw new YarnException(msg, e);
      }
    }
  }

  /**
   * Add or Update ApplicationHomeSubCluster.
   *
   * @param applicationId applicationId, is the id of the application.
   * @param subClusterId homeSubClusterId, this is selected by strategy.
   * @param retryCount number of retries.
   * @param appSubmissionContext appSubmissionContext.
   * @throws YarnException yarn exception.
   */
  public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
      SubClusterId subClusterId, int retryCount, ApplicationSubmissionContext appSubmissionContext)
      throws YarnException {
    Boolean exists = existsApplicationHomeSubCluster(applicationId);
    ApplicationHomeSubCluster appHomeSubCluster =
        ApplicationHomeSubCluster.newInstance(applicationId, Time.now(),
        subClusterId, appSubmissionContext);
    if (!exists || retryCount == 0) {
      // persist the mapping of applicationId and the subClusterId which has
      // been selected as its home.
      addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
    } else {
      // update the mapping of applicationId and the home subClusterId to
      // the new subClusterId we have selected.
      updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
    }
  }

  /**
   * Exists ReservationHomeSubCluster Mapping.
   *
   * @param reservationId reservationId
   * @return true - exist, false - not exist
   */
  public boolean existsReservationHomeSubCluster(ReservationId reservationId) {
    try {
      SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
      if (subClusterId != null) {
        return true;
      }
    } catch (YarnException e) {
      LOG.debug("get homeSubCluster by reservationId = {} error.", reservationId, e);
    }
    return false;
  }

  /**
   * Save Reservation And HomeSubCluster Mapping.
   *
   * @param reservationId reservationId
   * @param homeSubCluster homeSubCluster
   * @throws YarnException on failure
   */
  public void addReservationHomeSubCluster(ReservationId reservationId,
      ReservationHomeSubCluster homeSubCluster) throws YarnException {
    try {
      // persist the mapping of reservationId and the subClusterId which has
      // been selected as its home
      addReservationHomeSubCluster(homeSubCluster);
    } catch (YarnException e) {
      String msg = String.format(
          "Unable to insert the ReservationId %s into the FederationStateStore.", reservationId);
      throw new YarnException(msg, e);
    }
  }

  /**
   * Update Reservation And HomeSubCluster Mapping.
   *
   * @param subClusterId subClusterId
   * @param reservationId reservationId
   * @param homeSubCluster homeSubCluster
   * @throws YarnException on failure
   */
  public void updateReservationHomeSubCluster(SubClusterId subClusterId,
      ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
    try {
      // update the mapping of reservationId and the home subClusterId to
      // the new subClusterId we have selected
      updateReservationHomeSubCluster(homeSubCluster);
    } catch (YarnException e) {
      SubClusterId subClusterIdInStateStore = getReservationHomeSubCluster(reservationId);
      if (subClusterId == subClusterIdInStateStore) {
        LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId);
      } else {
        String msg = String.format(
            "Unable to update the ReservationId %s into the FederationStateStore.", reservationId);
        throw new YarnException(msg, e);
      }
    }
  }

  /**
   * Add or Update ReservationHomeSubCluster.
   *
   * @param reservationId reservationId.
   * @param subClusterId homeSubClusterId, this is selected by strategy.
   * @param retryCount number of retries.
   * @throws YarnException yarn exception.
   */
  public void addOrUpdateReservationHomeSubCluster(ReservationId reservationId,
      SubClusterId subClusterId, int retryCount) throws YarnException {
    Boolean exists = existsReservationHomeSubCluster(reservationId);
    ReservationHomeSubCluster reservationHomeSubCluster =
        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
    if (!exists || retryCount == 0) {
      // persist the mapping of reservationId and the subClusterId which has
      // been selected as its home.
      addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
    } else {
      // update the mapping of reservationId and the home subClusterId to
      // the new subClusterId we have selected.
      updateReservationHomeSubCluster(subClusterId, reservationId,
          reservationHomeSubCluster);
    }
  }

  /**
   * Deregister subCluster, Update the subCluster state to
   * SC_LOST���SC_DECOMMISSIONED etc.
   *
   * @param subClusterId subClusterId.
   * @param subClusterState The state of the subCluster to be updated.
   * @throws YarnException yarn exception.
   * @return If Deregister subCluster is successful, return true, otherwise, return false.
   */
  public boolean deregisterSubCluster(SubClusterId subClusterId,
      SubClusterState subClusterState) throws YarnException {
    SubClusterDeregisterRequest deregisterRequest =
        SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState);
    SubClusterDeregisterResponse response = stateStore.deregisterSubCluster(deregisterRequest);
    // If the response is not empty, deregisterSubCluster is successful.
    if (response != null) {
      return true;
    }
    return false;
  }

  /**
   * Get active subclusters.
   *
   * @return We will return a list of active subclusters as a Collection.
   */
  public Collection<SubClusterInfo> getActiveSubClusters()
      throws NotFoundException {
    try {
      Map<SubClusterId, SubClusterInfo> subClusterMap = getSubClusters(true);
      if (MapUtils.isEmpty(subClusterMap)) {
        throw new NotFoundException("Not Found SubClusters.");
      }
      return subClusterMap.values();
    } catch (Exception e) {
      LOG.error("getActiveSubClusters failed.", e);
      return null;
    }
  }

  /**
   * Get ApplicationSubmissionContext according to ApplicationId.
   * We don't throw exceptions. If the application cannot be found, we return null.
   *
   * @param appId ApplicationId
   * @return ApplicationSubmissionContext of ApplicationId
   */
  public ApplicationSubmissionContext getApplicationSubmissionContext(ApplicationId appId) {
    try {
      GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
          GetApplicationHomeSubClusterRequest.newInstance(appId));
      ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
      return appHomeSubCluster.getApplicationSubmissionContext();
    } catch (Exception e) {
      LOG.error("getApplicationSubmissionContext error, applicationId = {}.", appId, e);
      return null;
    }
  }

  public void deleteAllPoliciesConfigurations() throws Exception {
    DeletePoliciesConfigurationsRequest request =
        DeletePoliciesConfigurationsRequest.newInstance();
    stateStore.deleteAllPoliciesConfigurations(request);
  }

  @VisibleForTesting
  public FederationCache getFederationCache() {
    return federationCache;
  }

  public void deleteStore() throws Exception {
    stateStore.deleteStateStore();
  }

  public void deletePolicyConfigurations(List<String> queuesList) throws YarnException {
    if (CollectionUtils.isEmpty(queuesList)) {
      throw new YarnException("queuesList cannot be empty!");
    }
    DeleteSubClusterPoliciesConfigurationsRequest request =
        DeleteSubClusterPoliciesConfigurationsRequest.newInstance(queuesList);
    stateStore.deletePoliciesConfigurations(request);
  }
}