FederationRMAdminInterceptor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.router.rmadmin;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
import org.apache.hadoop.yarn.server.api.protocolrecords.FederationSubCluster;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetSubClustersRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetSubClustersResponse;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedHomePolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
import java.util.Set;
import java.util.Date;
import java.util.HashSet;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.hadoop.yarn.server.router.RouterServerUtil.checkPolicyManagerValid;

public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor {

  private static final Logger LOG =
      LoggerFactory.getLogger(FederationRMAdminInterceptor.class);

  private static final String COMMA = ",";
  private static final String COLON = ":";

  private static final List<String> SUPPORT_WEIGHT_MANAGERS =
      new ArrayList<>(Arrays.asList(WeightedLocalityPolicyManager.class.getName(),
      PriorityBroadcastPolicyManager.class.getName(), WeightedHomePolicyManager.class.getName()));

  private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies;
  private FederationStateStoreFacade federationFacade;
  private final Clock clock = new MonotonicClock();
  private RouterMetrics routerMetrics;
  private ThreadPoolExecutor executorService;
  private Configuration conf;
  private long heartbeatExpirationMillis;

  @Override
  public void init(String userName) {
    super.init(userName);

    int numThreads = getConf().getInt(
        YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();

    long keepAliveTime = getConf().getTimeDuration(
        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME,
        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS);

    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
        keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);

    boolean allowCoreThreadTimeOut =  getConf().getBoolean(
        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);

    if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
      this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
    }

    federationFacade = FederationStateStoreFacade.getInstance(this.getConf());
    this.conf = this.getConf();
    this.adminRMProxies = new ConcurrentHashMap<>();
    routerMetrics = RouterMetrics.getMetrics();

    this.heartbeatExpirationMillis = this.conf.getTimeDuration(
        YarnConfiguration.ROUTER_SUBCLUSTER_EXPIRATION_TIME,
        YarnConfiguration.DEFAULT_ROUTER_SUBCLUSTER_EXPIRATION_TIME, TimeUnit.MILLISECONDS);
  }

  @VisibleForTesting
  protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster(
      SubClusterId subClusterId) throws Exception {

    if (adminRMProxies.containsKey(subClusterId)) {
      return adminRMProxies.get(subClusterId);
    }

    ResourceManagerAdministrationProtocol adminRMProxy = null;
    try {
      boolean serviceAuthEnabled = this.conf.getBoolean(
          CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
      UserGroupInformation realUser = user;
      if (serviceAuthEnabled) {
        realUser = UserGroupInformation.createProxyUser(
            user.getShortUserName(), UserGroupInformation.getLoginUser());
      }
      adminRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
          ResourceManagerAdministrationProtocol.class, subClusterId, realUser);
    } catch (Exception e) {
      RouterServerUtil.logAndThrowException(e,
          "Unable to create the interface to reach the SubCluster %s", subClusterId);
    }
    adminRMProxies.put(subClusterId, adminRMProxy);
    return adminRMProxy;
  }

  @Override
  public void setNextInterceptor(RMAdminRequestInterceptor next) {
    throw new YarnRuntimeException("setNextInterceptor is being called on "
       + "FederationRMAdminRequestInterceptor, which should be the last one "
       + "in the chain. Check if the interceptor pipeline configuration "
       + "is correct");
  }

  /**
   * Refresh queue requests.
   *
   * The Router supports refreshing all SubCluster queues at once,
   * and also supports refreshing queues by SubCluster.
   *
   * @param request RefreshQueuesRequest, If subClusterId is not empty,
   * it means that we want to refresh the queue of the specified subClusterId.
   * If subClusterId is empty, it means we want to refresh all queues.
   *
   * @return RefreshQueuesResponse, There is no specific information in the response,
   * as long as it is not empty, it means that the request is successful.
   *
   * @throws StandbyException exception thrown by non-active server.
   * @throws YarnException indicates exceptions from yarn servers.
   * @throws IOException io error occurs.
   */
  @Override
  public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
      throws StandbyException, YarnException, IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrRefreshQueuesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshQueues request.", null);
    }

    // call refreshQueues of activeSubClusters.
    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
           new Class[] {RefreshQueuesRequest.class}, new Object[] {request});

      String subClusterId = request.getSubClusterId();
      Collection<RefreshQueuesResponse> refreshQueueResps =
          remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class, subClusterId);

      // If we get the return result from refreshQueueResps,
      // it means that the call has been successful,
      // and the RefreshQueuesResponse method can be reconstructed and returned.
      if (CollectionUtils.isNotEmpty(refreshQueueResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime);
        return RefreshQueuesResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshQueuesFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshQueue due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshQueuesFailedRetrieved();
    throw new YarnException("Unable to refreshQueue.");
  }

  /**
   * Refresh node requests.
   *
   * The Router supports refreshing all SubCluster nodes at once,
   * and also supports refreshing node by SubCluster.
   *
   * @param request RefreshNodesRequest, If subClusterId is not empty,
   * it means that we want to refresh the node of the specified subClusterId.
   * If subClusterId is empty, it means we want to refresh all nodes.
   *
   * @return RefreshNodesResponse, There is no specific information in the response,
   * as long as it is not empty, it means that the request is successful.
   *
   * @throws StandbyException exception thrown by non-active server.
   * @throws YarnException indicates exceptions from yarn servers.
   * @throws IOException io error occurs.
   */
  @Override
  public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
      throws StandbyException, YarnException, IOException {

    // parameter verification.
    // We will not check whether the DecommissionType is empty,
    // because this parameter has a default value at the proto level.
    if (request == null) {
      routerMetrics.incrRefreshNodesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshNodes request.", null);
    }

    // call refreshNodes of activeSubClusters.
    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[] {RefreshNodesRequest.class}, new Object[] {request});

      String subClusterId = request.getSubClusterId();
      Collection<RefreshNodesResponse> refreshNodesResps =
          remoteMethod.invokeConcurrent(this, RefreshNodesResponse.class, subClusterId);

      if (CollectionUtils.isNotEmpty(refreshNodesResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshNodesRetrieved(stopTime - startTime);
        return RefreshNodesResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshNodesFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshNodes due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshNodesFailedRetrieved();
    throw new YarnException("Unable to refreshNodes due to exception.");
  }

  /**
   * Refresh SuperUserGroupsConfiguration requests.
   *
   * The Router supports refreshing all subCluster SuperUserGroupsConfiguration at once,
   * and also supports refreshing SuperUserGroupsConfiguration by SubCluster.
   *
   * @param request RefreshSuperUserGroupsConfigurationRequest,
   * If subClusterId is not empty, it means that we want to
   * refresh the superuser groups configuration of the specified subClusterId.
   * If subClusterId is empty, it means we want to
   * refresh all subCluster superuser groups configuration.
   *
   * @return RefreshSuperUserGroupsConfigurationResponse,
   * There is no specific information in the response, as long as it is not empty,
   * it means that the request is successful.
   *
   * @throws StandbyException exception thrown by non-active server.
   * @throws YarnException indicates exceptions from yarn servers.
   * @throws IOException io error occurs.
   */
  @Override
  public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
      RefreshSuperUserGroupsConfigurationRequest request)
      throws StandbyException, YarnException, IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrRefreshSuperUserGroupsConfigurationFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshSuperUserGroupsConfiguration request.",
          null);
    }

    // call refreshSuperUserGroupsConfiguration of activeSubClusters.
    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[] {RefreshSuperUserGroupsConfigurationRequest.class}, new Object[] {request});

      String subClusterId = request.getSubClusterId();
      Collection<RefreshSuperUserGroupsConfigurationResponse> refreshSuperUserGroupsConfResps =
          remoteMethod.invokeConcurrent(this, RefreshSuperUserGroupsConfigurationResponse.class,
          subClusterId);

      if (CollectionUtils.isNotEmpty(refreshSuperUserGroupsConfResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshSuperUserGroupsConfRetrieved(stopTime - startTime);
        return RefreshSuperUserGroupsConfigurationResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshSuperUserGroupsConfigurationFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshSuperUserGroupsConfiguration due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshSuperUserGroupsConfigurationFailedRetrieved();
    throw new YarnException("Unable to refreshSuperUserGroupsConfiguration.");
  }

  /**
   * Refresh UserToGroupsMappings requests.
   *
   * The Router supports refreshing all subCluster UserToGroupsMappings at once,
   * and also supports refreshing UserToGroupsMappings by subCluster.
   *
   * @param request RefreshUserToGroupsMappingsRequest,
   * If subClusterId is not empty, it means that we want to
   * refresh the user groups mapping of the specified subClusterId.
   * If subClusterId is empty, it means we want to
   * refresh all subCluster user groups mapping.
   *
   * @return RefreshUserToGroupsMappingsResponse,
   * There is no specific information in the response, as long as it is not empty,
   * it means that the request is successful.
   *
   * @throws StandbyException exception thrown by non-active server.
   * @throws YarnException indicates exceptions from yarn servers.
   * @throws IOException io error occurs.
   */
  @Override
  public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
      RefreshUserToGroupsMappingsRequest request) throws StandbyException, YarnException,
      IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrRefreshUserToGroupsMappingsFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshUserToGroupsMappings request.", null);
    }

    // call refreshUserToGroupsMappings of activeSubClusters.
    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[] {RefreshUserToGroupsMappingsRequest.class}, new Object[] {request});

      String subClusterId = request.getSubClusterId();
      Collection<RefreshUserToGroupsMappingsResponse> refreshUserToGroupsMappingsResps =
          remoteMethod.invokeConcurrent(this, RefreshUserToGroupsMappingsResponse.class,
          subClusterId);

      if (CollectionUtils.isNotEmpty(refreshUserToGroupsMappingsResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshUserToGroupsMappingsRetrieved(stopTime - startTime);
        return RefreshUserToGroupsMappingsResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshUserToGroupsMappingsFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshUserToGroupsMappings due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshUserToGroupsMappingsFailedRetrieved();
    throw new YarnException("Unable to refreshUserToGroupsMappings.");
  }

  @Override
  public RefreshAdminAclsResponse refreshAdminAcls(RefreshAdminAclsRequest request)
      throws YarnException, IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrRefreshAdminAclsFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshAdminAcls request.", null);
    }

    // call refreshAdminAcls of activeSubClusters.
    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[] {RefreshAdminAclsRequest.class}, new Object[] {request});
      String subClusterId = request.getSubClusterId();
      Collection<RefreshAdminAclsResponse> refreshAdminAclsResps =
          remoteMethod.invokeConcurrent(this, RefreshAdminAclsResponse.class, subClusterId);
      if (CollectionUtils.isNotEmpty(refreshAdminAclsResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshAdminAclsRetrieved(stopTime - startTime);
        return RefreshAdminAclsResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshAdminAclsFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshAdminAcls due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshAdminAclsFailedRetrieved();
    throw new YarnException("Unable to refreshAdminAcls.");
  }

  @Override
  public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request)
      throws YarnException, IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrRefreshServiceAclsFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshServiceAcls request.", null);
    }

    // call refreshAdminAcls of activeSubClusters.
    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{RefreshServiceAclsRequest.class}, new Object[]{request});
      String subClusterId = request.getSubClusterId();
      Collection<RefreshServiceAclsResponse> refreshServiceAclsResps =
          remoteMethod.invokeConcurrent(this, RefreshServiceAclsResponse.class, subClusterId);
      if (CollectionUtils.isNotEmpty(refreshServiceAclsResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshServiceAclsRetrieved(stopTime - startTime);
        return RefreshServiceAclsResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshServiceAclsFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshAdminAcls due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshServiceAclsFailedRetrieved();
    throw new YarnException("Unable to refreshServiceAcls.");
  }

  @Override
  public UpdateNodeResourceResponse updateNodeResource(UpdateNodeResourceRequest request)
      throws YarnException, IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrUpdateNodeResourceFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing UpdateNodeResource request.", null);
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrUpdateNodeResourceFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing UpdateNodeResource SubClusterId.", null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{UpdateNodeResourceRequest.class}, new Object[]{request});
      Collection<UpdateNodeResourceResponse> updateNodeResourceResps =
          remoteMethod.invokeConcurrent(this, UpdateNodeResourceResponse.class, subClusterId);
      if (CollectionUtils.isNotEmpty(updateNodeResourceResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededUpdateNodeResourceRetrieved(stopTime - startTime);
        return UpdateNodeResourceResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrUpdateNodeResourceFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to updateNodeResource due to exception. " + e.getMessage());
    }

    routerMetrics.incrUpdateNodeResourceFailedRetrieved();
    throw new YarnException("Unable to updateNodeResource.");
  }

  @Override
  public RefreshNodesResourcesResponse refreshNodesResources(RefreshNodesResourcesRequest request)
      throws YarnException, IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshNodesResources request.", null);
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshNodesResources SubClusterId.", null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{RefreshNodesResourcesRequest.class}, new Object[]{request});
      Collection<RefreshNodesResourcesResponse> refreshNodesResourcesResps =
          remoteMethod.invokeConcurrent(this, RefreshNodesResourcesResponse.class, subClusterId);
      if (CollectionUtils.isNotEmpty(refreshNodesResourcesResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshNodesResourcesRetrieved(stopTime - startTime);
        return RefreshNodesResourcesResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshNodesResources due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
    throw new YarnException("Unable to refreshNodesResources.");
  }

  @Override
  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
      AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
    // parameter verification.
    if (request == null) {
      routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing AddToClusterNodeLabels request.", null);
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing AddToClusterNodeLabels SubClusterId.", null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{AddToClusterNodeLabelsRequest.class}, new Object[]{request});
      Collection<AddToClusterNodeLabelsResponse> addToClusterNodeLabelsResps =
          remoteMethod.invokeConcurrent(this, AddToClusterNodeLabelsResponse.class, subClusterId);
      if (CollectionUtils.isNotEmpty(addToClusterNodeLabelsResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededAddToClusterNodeLabelsRetrieved(stopTime - startTime);
        return AddToClusterNodeLabelsResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to addToClusterNodeLabels due to exception. " + e.getMessage());
    }

    routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
    throw new YarnException("Unable to addToClusterNodeLabels.");
  }

  @Override
  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
      RemoveFromClusterNodeLabelsRequest request)
      throws YarnException, IOException {
    // parameter verification.
    if (request == null) {
      routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RemoveFromClusterNodeLabels request.", null);
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RemoveFromClusterNodeLabels SubClusterId.",
          null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{RemoveFromClusterNodeLabelsRequest.class}, new Object[]{request});
      Collection<RemoveFromClusterNodeLabelsResponse> refreshNodesResourcesResps =
          remoteMethod.invokeConcurrent(this, RemoveFromClusterNodeLabelsResponse.class,
          subClusterId);
      if (CollectionUtils.isNotEmpty(refreshNodesResourcesResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime);
        return RemoveFromClusterNodeLabelsResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to removeFromClusterNodeLabels due to exception. " + e.getMessage());
    }

    routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
    throw new YarnException("Unable to removeFromClusterNodeLabels.");
  }

  @Override
  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(ReplaceLabelsOnNodeRequest request)
      throws YarnException, IOException {
    // parameter verification.
    if (request == null) {
      routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing ReplaceLabelsOnNode request.", null);
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing ReplaceLabelsOnNode SubClusterId.", null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{ReplaceLabelsOnNodeRequest.class}, new Object[]{request});
      Collection<ReplaceLabelsOnNodeResponse> replaceLabelsOnNodeResps =
          remoteMethod.invokeConcurrent(this, ReplaceLabelsOnNodeResponse.class, subClusterId);
      if (CollectionUtils.isNotEmpty(replaceLabelsOnNodeResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime);
        return ReplaceLabelsOnNodeResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to replaceLabelsOnNode due to exception. " + e.getMessage());
    }

    routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
    throw new YarnException("Unable to replaceLabelsOnNode.");
  }

  @Override
  public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
      CheckForDecommissioningNodesRequest request) throws YarnException, IOException {

    // Parameter check
    if (request == null) {
      RouterServerUtil.logAndThrowException("Missing checkForDecommissioningNodes request.", null);
      routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing checkForDecommissioningNodes SubClusterId.",
          null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{CheckForDecommissioningNodesRequest.class}, new Object[]{request});

      Collection<CheckForDecommissioningNodesResponse> responses =
          remoteMethod.invokeConcurrent(this, CheckForDecommissioningNodesResponse.class,
          subClusterId);

      if (CollectionUtils.isNotEmpty(responses)) {
        // We selected a subCluster, the list is not empty and size=1.
        List<CheckForDecommissioningNodesResponse> collects =
            responses.stream().collect(Collectors.toList());
        if (!collects.isEmpty() && collects.size() == 1) {
          CheckForDecommissioningNodesResponse response = collects.get(0);
          long stopTime = clock.getTime();
          routerMetrics.succeededCheckForDecommissioningNodesRetrieved((stopTime - startTime));
          Set<NodeId> nodes = response.getDecommissioningNodes();
          return CheckForDecommissioningNodesResponse.newInstance(nodes);
        }
      }
    } catch (YarnException e) {
      routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to checkForDecommissioningNodes due to exception " + e.getMessage());
    }

    routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
    throw new YarnException("Unable to checkForDecommissioningNodes.");
  }

  @Override
  public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
      RefreshClusterMaxPriorityRequest request) throws YarnException, IOException {

    // parameter verification.
    if (request == null) {
      routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshClusterMaxPriority request.", null);
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing RefreshClusterMaxPriority SubClusterId.",
          null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{RefreshClusterMaxPriorityRequest.class}, new Object[]{request});
      Collection<RefreshClusterMaxPriorityResponse> refreshClusterMaxPriorityResps =
          remoteMethod.invokeConcurrent(this, RefreshClusterMaxPriorityResponse.class,
          subClusterId);
      if (CollectionUtils.isNotEmpty(refreshClusterMaxPriorityResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededRefreshClusterMaxPriorityRetrieved(stopTime - startTime);
        return RefreshClusterMaxPriorityResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to refreshClusterMaxPriority due to exception. " + e.getMessage());
    }

    routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
    throw new YarnException("Unable to refreshClusterMaxPriority.");
  }

  @Override
  public NodesToAttributesMappingResponse mapAttributesToNodes(
      NodesToAttributesMappingRequest request) throws YarnException, IOException {
    // parameter verification.
    if (request == null) {
      routerMetrics.incrMapAttributesToNodesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing mapAttributesToNodes request.", null);
    }

    String subClusterId = request.getSubClusterId();
    if (StringUtils.isBlank(subClusterId)) {
      routerMetrics.incrMapAttributesToNodesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing mapAttributesToNodes SubClusterId.", null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{NodesToAttributesMappingRequest.class}, new Object[]{request});
      Collection<NodesToAttributesMappingResponse> mapAttributesToNodesResps =
          remoteMethod.invokeConcurrent(this, NodesToAttributesMappingResponse.class,
          subClusterId);
      if (CollectionUtils.isNotEmpty(mapAttributesToNodesResps)) {
        long stopTime = clock.getTime();
        routerMetrics.succeededMapAttributesToNodesRetrieved(stopTime - startTime);
        return NodesToAttributesMappingResponse.newInstance();
      }
    } catch (YarnException e) {
      routerMetrics.incrMapAttributesToNodesFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to mapAttributesToNodes due to exception. " + e.getMessage());
    }

    routerMetrics.incrMapAttributesToNodesFailedRetrieved();
    throw new YarnException("Unable to mapAttributesToNodes.");
  }

  @Override
  public String[] getGroupsForUser(String user) throws IOException {
    // parameter verification.
    if (StringUtils.isBlank(user)) {
      routerMetrics.incrGetGroupsForUserFailedRetrieved();
      RouterServerUtil.logAndThrowIOException("Missing getGroupsForUser user.", null);
    }

    try {
      long startTime = clock.getTime();
      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
          new Class[]{String.class}, new Object[]{user});
      Collection<String[]> getGroupsForUserResps =
          remoteMethod.invokeConcurrent(this, String[].class, null);
      if (CollectionUtils.isNotEmpty(getGroupsForUserResps)) {
        long stopTime = clock.getTime();
        Set<String> groups = new HashSet<>();
        for (String[] groupArr : getGroupsForUserResps) {
          if (groupArr != null && groupArr.length > 0) {
            for (String group : groupArr) {
              groups.add(group);
            }
          }
        }
        routerMetrics.succeededGetGroupsForUsersRetrieved(stopTime - startTime);
        return groups.toArray(new String[]{});
      }
    } catch (YarnException e) {
      routerMetrics.incrGetGroupsForUserFailedRetrieved();
      RouterServerUtil.logAndThrowIOException(e,
          "Unable to getGroupsForUser due to exception. " + e.getMessage());
    }

    routerMetrics.incrGetGroupsForUserFailedRetrieved();
    throw new IOException("Unable to getGroupsForUser.");
  }

  @VisibleForTesting
  public FederationStateStoreFacade getFederationFacade() {
    return federationFacade;
  }

  @VisibleForTesting
  public ThreadPoolExecutor getExecutorService() {
    return executorService;
  }

  /**
   * In YARN Federation mode, We allow users to mark subClusters
   * With no heartbeat for a long time as SC_LOST state.
   *
   * If we include a specific subClusterId in the request, check for the specified subCluster.
   * If subClusterId is empty, all subClusters are checked.
   *
   * @param request deregisterSubCluster request.
   * The request contains the id of to deregister sub-cluster.
   * @return Response from deregisterSubCluster.
   * @throws YarnException exceptions from yarn servers.
   * @throws IOException if an IO error occurred.
   */
  @Override
  public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
      throws YarnException, IOException {

    if (request == null) {
      routerMetrics.incrDeregisterSubClusterFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing DeregisterSubCluster request.", null);
    }

    try {
      long startTime = clock.getTime();
      List<DeregisterSubClusters> deregisterSubClusterList = new ArrayList<>();
      String reqSubClusterId = request.getSubClusterId();
      if (StringUtils.isNotBlank(reqSubClusterId)) {
        // If subCluster is not empty, process the specified subCluster.
        DeregisterSubClusters deregisterSubClusters = deregisterSubCluster(reqSubClusterId);
        deregisterSubClusterList.add(deregisterSubClusters);
      } else {
        // Traversing all Active SubClusters,
        // for subCluster whose heartbeat times out, update the status to SC_LOST.
        Map<SubClusterId, SubClusterInfo> subClusterInfo = federationFacade.getSubClusters(true);
        for (Map.Entry<SubClusterId, SubClusterInfo> entry : subClusterInfo.entrySet()) {
          SubClusterId subClusterId = entry.getKey();
          DeregisterSubClusters deregisterSubClusters = deregisterSubCluster(subClusterId.getId());
          deregisterSubClusterList.add(deregisterSubClusters);
        }
      }
      long stopTime = clock.getTime();
      routerMetrics.succeededDeregisterSubClusterRetrieved(stopTime - startTime);
      return DeregisterSubClusterResponse.newInstance(deregisterSubClusterList);
    } catch (Exception e) {
      routerMetrics.incrDeregisterSubClusterFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to deregisterSubCluster due to exception. " + e.getMessage());
    }

    routerMetrics.incrDeregisterSubClusterFailedRetrieved();
    throw new YarnException("Unable to deregisterSubCluster.");
  }

  /**
   * Save the Queue Policy for the Federation.
   *
   * @param request saveFederationQueuePolicy Request.
   * @return Response from saveFederationQueuePolicy.
   * @throws YarnException exceptions from yarn servers.
   * @throws IOException if an IO error occurred.
   */
  @Override
  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {

    // Parameter validation.

    if (request == null) {
      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing SaveFederationQueuePolicy request.", null);
    }

    FederationQueueWeight federationQueueWeight = request.getFederationQueueWeight();
    if (federationQueueWeight == null) {
      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing FederationQueueWeight information.", null);
    }

    String queue = request.getQueue();
    if (StringUtils.isBlank(queue)) {
      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing Queue information.", null);
    }

    String policyManagerClassName = request.getPolicyManagerClassName();
    if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
      RouterServerUtil.logAndThrowException(policyManagerClassName +
          " does not support the use of queue weights.", null);
    }

    String amRmWeight = federationQueueWeight.getAmrmWeight();
    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);

    String routerWeight = federationQueueWeight.getRouterWeight();
    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerWeight);

    String headRoomAlpha = federationQueueWeight.getHeadRoomAlpha();
    FederationQueueWeight.checkHeadRoomAlphaValid(headRoomAlpha);

    try {
      long startTime = clock.getTime();

      // Step2, parse amRMPolicyWeights.
      Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight);
      LOG.debug("amRMPolicyWeights = {}.", amRMPolicyWeights);

      // Step3, parse routerPolicyWeights.
      Map<SubClusterIdInfo, Float> routerPolicyWeights = getSubClusterWeightMap(routerWeight);
      LOG.debug("routerWeights = {}.", amRMPolicyWeights);

      // Step4, Initialize WeightedPolicyInfo.
      WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo();
      weightedPolicyInfo.setHeadroomAlpha(Float.parseFloat(headRoomAlpha));
      weightedPolicyInfo.setAMRMPolicyWeights(amRMPolicyWeights);
      weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights);

      // Step5, Set SubClusterPolicyConfiguration.
      SubClusterPolicyConfiguration policyConfiguration =
          SubClusterPolicyConfiguration.newInstance(queue, policyManagerClassName,
          weightedPolicyInfo.toByteBuffer());
      federationFacade.setPolicyConfiguration(policyConfiguration);
      long stopTime = clock.getTime();
      routerMetrics.succeededSaveFederationQueuePolicyRetrieved(stopTime - startTime);
      return SaveFederationQueuePolicyResponse.newInstance("save policy success.");
    } catch (Exception e) {
      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to saveFederationQueuePolicy due to exception. " + e.getMessage());
    }

    routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
    throw new YarnException("Unable to saveFederationQueuePolicy.");
  }

  /**
   * Batch Save the Queue Policies for the Federation.
   *
   * @param request BatchSaveFederationQueuePolicies Request
   * @return Response from batchSaveFederationQueuePolicies.
   * @throws YarnException exceptions from yarn servers.
   * @throws IOException if an IO error occurred.
   */
  @Override
  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {

    // Parameter validation.
    if (request == null) {
      routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
      RouterServerUtil.logAndThrowException(
          "Missing BatchSaveFederationQueuePoliciesRequest request.", null);
    }

    List<FederationQueueWeight> federationQueueWeights = request.getFederationQueueWeights();
    if (federationQueueWeights == null) {
      routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
      RouterServerUtil.logAndThrowException("Missing FederationQueueWeights information.", null);
    }

    try {
      long startTime = clock.getTime();
      for (FederationQueueWeight federationQueueWeight : federationQueueWeights) {
        saveFederationQueuePolicy(federationQueueWeight);
      }
      long stopTime = clock.getTime();
      routerMetrics.succeededBatchSaveFederationQueuePoliciesRetrieved(stopTime - startTime);
      return BatchSaveFederationQueuePoliciesResponse.newInstance("batch save policies success.");
    } catch (Exception e) {
      routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to batchSaveFederationQueuePolicies due to exception. " + e.getMessage());
    }

    routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
    throw new YarnException("Unable to batchSaveFederationQueuePolicies.");
  }

  /**
   * List the Queue Policies for the Federation.
   *
   * @param request QueryFederationQueuePolicies Request.
   * @return QueryFederationQueuePolicies Response.
   *
   * @throws YarnException indicates exceptions from yarn servers.
   * @throws IOException io error occurs.
   */
  @Override
  public QueryFederationQueuePoliciesResponse listFederationQueuePolicies(
      QueryFederationQueuePoliciesRequest request) throws YarnException, IOException {

    // Parameter validation.
    if (request == null) {
      routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
      RouterServerUtil.logAndThrowException(
          "Missing ListFederationQueuePolicies Request.", null);
    }

    if (request.getPageSize() <= 0) {
      routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
      RouterServerUtil.logAndThrowException(
          "PageSize cannot be negative or zero.", null);
    }

    if (request.getCurrentPage() <= 0) {
      routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
      RouterServerUtil.logAndThrowException(
          "CurrentPage cannot be negative or zero.", null);
    }

    try {
      QueryFederationQueuePoliciesResponse response;

      long startTime = clock.getTime();
      String queue = request.getQueue();
      List<String> queues = request.getQueues();
      int currentPage = request.getCurrentPage();
      int pageSize = request.getPageSize();

      // Print log
      LOG.info("queue = {}, queues={}, currentPage={}, pageSize={}",
          queue, queues, currentPage, pageSize);

      Map<String, SubClusterPolicyConfiguration> policiesConfigurations =
          federationFacade.getPoliciesConfigurations();

      // If the queue is not empty, filter according to the queue.
      if (StringUtils.isNotBlank(queue)) {
        response = filterPoliciesConfigurationsByQueue(queue, policiesConfigurations,
            pageSize, currentPage);
      } else if(CollectionUtils.isNotEmpty(queues)) {
        // If queues are not empty, filter by queues, which may return multiple results.
        // We filter by pagination.
        response = filterPoliciesConfigurationsByQueues(queues, policiesConfigurations,
            pageSize, currentPage);
      } else {
        // If we don't have any filtering criteria, we should also support paginating the results.
        response = filterPoliciesConfigurations(policiesConfigurations, pageSize, currentPage);
      }
      long stopTime = clock.getTime();
      routerMetrics.succeededListFederationQueuePoliciesRetrieved(stopTime - startTime);
      if (response == null) {
        response = QueryFederationQueuePoliciesResponse.newInstance();
      }
      return response;
    } catch (Exception e) {
      routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
      RouterServerUtil.logAndThrowException(e,
          "Unable to ListFederationQueuePolicies due to exception. " + e.getMessage());
    }

    routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
    throw new YarnException("Unable to listFederationQueuePolicies.");
  }

  @Override
  public DeleteFederationApplicationResponse deleteFederationApplication(
      DeleteFederationApplicationRequest request) throws YarnException, IOException {

    // Parameter validation.
    if (request == null) {
      routerMetrics.incrDeleteFederationApplicationFailedRetrieved();
      RouterServerUtil.logAndThrowException(
          "Missing deleteFederationApplication Request.", null);
    }

    String application = request.getApplication();
    if (StringUtils.isBlank(application)) {
      routerMetrics.incrDeleteFederationApplicationFailedRetrieved();
      RouterServerUtil.logAndThrowException(
          "ApplicationId cannot be null.", null);
    }

    // Try calling deleteApplicationHomeSubCluster to delete the application.
    try {
      long startTime = clock.getTime();
      ApplicationId applicationId = ApplicationId.fromString(application);
      federationFacade.deleteApplicationHomeSubCluster(applicationId);
      long stopTime = clock.getTime();
      routerMetrics.succeededDeleteFederationApplicationFailedRetrieved(stopTime - startTime);
      return DeleteFederationApplicationResponse.newInstance(
          "applicationId = " + applicationId + " delete success.");
    } catch (Exception e) {
      RouterServerUtil.logAndThrowException(e,
          "Unable to deleteFederationApplication due to exception. " + e.getMessage());
    }

    throw new YarnException("Unable to deleteFederationApplication.");
  }

  /**
   * Get federation subcluster list.
   *
   * @param request GetSubClustersRequest Request.
   * @return SubClusters Response.
   * @throws YarnException exceptions from yarn servers.
   * @throws IOException io error occurs.
   */
  @Override
  public GetSubClustersResponse getFederationSubClusters(GetSubClustersRequest request)
       throws YarnException, IOException {

    // Parameter validation.
    if (request == null) {
      routerMetrics.incrGetFederationSubClustersFailedRetrieved();
      RouterServerUtil.logAndThrowException(
          "Missing getFederationSubClusters Request.", null);
    }

    // Step1. Get all subClusters of the cluster.
    Map<SubClusterId, SubClusterInfo> subClusters =
        federationFacade.getSubClusters(false);

    // Step2. Get FederationSubCluster data.
    List<FederationSubCluster> federationSubClusters = new ArrayList<>();
    long startTime = clock.getTime();
    for (Map.Entry<SubClusterId, SubClusterInfo> subCluster : subClusters.entrySet()) {
      SubClusterId subClusterId = subCluster.getKey();
      try {
        SubClusterInfo subClusterInfo = subCluster.getValue();
        long lastHeartBeat = subClusterInfo.getLastHeartBeat();
        Date lastHeartBeatDate = new Date(lastHeartBeat);
        FederationSubCluster federationSubCluster = FederationSubCluster.newInstance(
            subClusterId.getId(), subClusterInfo.getState().name(), lastHeartBeatDate.toString());
        federationSubClusters.add(federationSubCluster);
      } catch (Exception e) {
        routerMetrics.incrGetFederationSubClustersFailedRetrieved();
        LOG.error("getSubClusters SubClusterId = [%s] error.", subClusterId, e);
      }
    }
    long stopTime = clock.getTime();
    routerMetrics.succeededGetFederationSubClustersRetrieved(stopTime - startTime);

    // Step3. Return results.
    return GetSubClustersResponse.newInstance(federationSubClusters);
  }

  /**
   * Delete Policies based on the provided queue list.
   *
   * @param request DeleteFederationQueuePoliciesRequest Request.
   * @return If the deletion is successful, the queue deletion success message will be returned.
   * @throws YarnException indicates exceptions from yarn servers.
   * @throws IOException io error occurs.
   */
  @Override
  public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues(
      DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException {

    // Parameter validation.
    if (request == null) {
      routerMetrics.incrDeleteFederationPoliciesByQueuesRetrieved();
      RouterServerUtil.logAndThrowException(
          "Missing deleteFederationQueuePoliciesByQueues Request.", null);
    }

    List<String> queues = request.getQueues();
    if (CollectionUtils.isEmpty(queues)) {
      routerMetrics.incrDeleteFederationPoliciesByQueuesRetrieved();
      RouterServerUtil.logAndThrowException("queues cannot be null.", null);
    }

    // Try calling deleteApplicationHomeSubCluster to delete the application.
    try {
      long startTime = clock.getTime();
      federationFacade.deletePolicyConfigurations(queues);
      long stopTime = clock.getTime();
      routerMetrics.succeededDeleteFederationPoliciesByQueuesRetrieved(stopTime - startTime);
      return DeleteFederationQueuePoliciesResponse.newInstance(
         "queues = " + StringUtils.join(queues, ",") + " delete success.");
    } catch (Exception e) {
      RouterServerUtil.logAndThrowException(e,
          "Unable to deleteFederationPoliciesByQueues due to exception. " + e.getMessage());
    }
    throw new YarnException("Unable to deleteFederationPoliciesByQueues.");
  }

  /**
   * According to the configuration information of the queue filtering queue,
   * this part should only return 1 result.
   *
   * @param queue queueName.
   * @param policiesConfigurations policy configurations.
   * @param pageSize Items per page.
   * @param currentPage The number of pages to be queried.
   * @return federation queue policies response.
   * @throws YarnException indicates exceptions from yarn servers.
   *
   */
  private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueue(String queue,
      Map<String, SubClusterPolicyConfiguration> policiesConfigurations,
      int pageSize, int currentPage) throws YarnException {

    // Step1. Check the parameters, if the policy list is empty, return empty directly.
    if (MapUtils.isEmpty(policiesConfigurations)) {
      return null;
    }
    SubClusterPolicyConfiguration policyConf = policiesConfigurations.getOrDefault(queue, null);
    if(policyConf == null) {
      return null;
    }

    // Step2. Parse the parameters.
    List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
    FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf);
    federationQueueWeights.add(federationQueueWeight);

    // Step3. Return result.
    return QueryFederationQueuePoliciesResponse.newInstance(
        1, 1, currentPage, pageSize, federationQueueWeights);
  }

  /**
   * Filter queue configuration information based on the queue list.
   *
   * @param queues The name of the queue.
   * @param policiesConfigurations policy configurations.
   * @param pageSize Items per page.
   * @param currentPage The number of pages to be queried.
   * @return federation queue policies response.
   * @throws YarnException indicates exceptions from yarn servers.
   */
  private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueues(
      List<String> queues, Map<String, SubClusterPolicyConfiguration> policiesConfigurations,
      int pageSize, int currentPage) throws YarnException {

    // Step1. Check the parameters, if the policy list is empty, return empty directly.
    if (MapUtils.isEmpty(policiesConfigurations)) {
      return null;
    }

    // Step2. Filtering for Queue Policies.
    List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
    for (String queue : queues) {
      SubClusterPolicyConfiguration policyConf = policiesConfigurations.getOrDefault(queue, null);
      if(policyConf == null) {
        continue;
      }
      FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf);
      if (federationQueueWeight != null) {
        federationQueueWeights.add(federationQueueWeight);
      }
    }

    // Step3. To paginate the returned results.
    return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage);
  }

  /**
   * Filter PoliciesConfigurations, and we paginate Policies within this method.
   *
   * @param policiesConfigurations policy configurations.
   * @param pageSize Items per page.
   * @param currentPage The number of pages to be queried.
   * @return federation queue policies response.
   * @throws YarnException indicates exceptions from yarn servers.
   */
  private QueryFederationQueuePoliciesResponse filterPoliciesConfigurations(
      Map<String, SubClusterPolicyConfiguration> policiesConfigurations,
      int pageSize, int currentPage) throws YarnException {

    // Step1. Check the parameters, if the policy list is empty, return empty directly.
    if (MapUtils.isEmpty(policiesConfigurations)) {
      return null;
    }

    // Step2. Traverse policiesConfigurations and obtain the FederationQueueWeight list.
    List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
    for (Map.Entry<String, SubClusterPolicyConfiguration> entry :
        policiesConfigurations.entrySet()) {
      String queue = entry.getKey();
      SubClusterPolicyConfiguration policyConf = entry.getValue();
      if (policyConf == null) {
        continue;
      }
      FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf);
      if (federationQueueWeight != null) {
        federationQueueWeights.add(federationQueueWeight);
      }
    }

    // Step3. To paginate the returned results.
    return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage);
  }

  /**
   * Pagination for FederationQueuePolicies.
   *
   * @param queueWeights List Of FederationQueueWeight.
   * @param pageSize Items per page.
   * @param currentPage The number of pages to be queried.
   * @return federation queue policies response.
   * @throws YarnException indicates exceptions from yarn servers.
   */
  private QueryFederationQueuePoliciesResponse queryFederationQueuePoliciesPagination(
      List<FederationQueueWeight> queueWeights, int pageSize, int currentPage)
      throws YarnException {
    if (CollectionUtils.isEmpty(queueWeights)) {
      return null;
    }

    int startIndex = (currentPage - 1) * pageSize;
    int endIndex = Math.min(startIndex + pageSize, queueWeights.size());

    if (startIndex > endIndex) {
      throw new YarnException("The index of the records to be retrieved " +
          "has exceeded the maximum index.");
    }

    List<FederationQueueWeight> subFederationQueueWeights =
        queueWeights.subList(startIndex, endIndex);

    int totalSize = queueWeights.size();
    int totalPage =
        (totalSize % pageSize == 0) ? totalSize / pageSize : (totalSize / pageSize) + 1;

    // Step3. Returns the Queue Policies result.
    return QueryFederationQueuePoliciesResponse.newInstance(
        totalSize, totalPage, currentPage, pageSize, subFederationQueueWeights);
  }

  /**
   * Parses a FederationQueueWeight from the given queue and SubClusterPolicyConfiguration.
   *
   * @param queue The name of the queue.
   * @param policyConf policy configuration.
   * @return Queue weights for representing Federation.
   * @throws YarnException YarnException indicates exceptions from yarn servers.
   */
  private FederationQueueWeight parseFederationQueueWeight(String queue,
      SubClusterPolicyConfiguration policyConf) throws YarnException {

    if (policyConf != null) {
      ByteBuffer params = policyConf.getParams();
      WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params);
      Map<SubClusterIdInfo, Float> amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights();
      Map<SubClusterIdInfo, Float> routerPolicyWeights =
          weightedPolicyInfo.getRouterPolicyWeights();
      float headroomAlpha = weightedPolicyInfo.getHeadroomAlpha();
      String policyManagerClassName = policyConf.getType();

      String amrmPolicyWeight = parsePolicyWeights(amrmPolicyWeights);
      String routerPolicyWeight = parsePolicyWeights(routerPolicyWeights);

      FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amrmPolicyWeight);
      FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerPolicyWeight);

      return FederationQueueWeight.newInstance(routerPolicyWeight, amrmPolicyWeight,
          String.valueOf(headroomAlpha), queue, policyManagerClassName);
    }

    return null;
  }

  /**
   * Parses the policy weights from the provided policyWeights map.
   * returns a string similar to the following:
   * SC-1:0.7,SC-2:0.3
   *
   * @param policyWeights
   *        A map containing SubClusterIdInfo as keys and corresponding weight values.
   * @return A string representation of the parsed policy weights.
   */
  protected String parsePolicyWeights(Map<SubClusterIdInfo, Float> policyWeights) {
    if (MapUtils.isEmpty(policyWeights)) {
      return null;
    }
    List<String> policyWeightList = new ArrayList<>();
    for (Map.Entry<SubClusterIdInfo, Float> entry : policyWeights.entrySet()) {
      SubClusterIdInfo key = entry.getKey();
      Float value = entry.getValue();
      policyWeightList.add(key.toId() + ":" + value);
    }
    return StringUtils.join(policyWeightList, ",");
  }

  /**
   * Save FederationQueuePolicy.
   *
   * @param federationQueueWeight queue weight.
   * @throws YarnException exceptions from yarn servers.
   */
  private void saveFederationQueuePolicy(FederationQueueWeight federationQueueWeight)
      throws YarnException {

    // Step1, Check whether the weight setting of the queue is as expected.
    String queue = federationQueueWeight.getQueue();
    String policyManagerClassName = federationQueueWeight.getPolicyManagerClassName();

    if (StringUtils.isBlank(queue)) {
      RouterServerUtil.logAndThrowException("Missing Queue information.", null);
    }

    if (StringUtils.isBlank(policyManagerClassName)) {
      RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName information.", null);
    }

    if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
      RouterServerUtil.logAndThrowException(policyManagerClassName +
              "does not support the use of queue weights.", null);
    }

    String amRmWeight = federationQueueWeight.getAmrmWeight();
    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);

    String routerWeight = federationQueueWeight.getRouterWeight();
    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerWeight);

    String headRoomAlpha = federationQueueWeight.getHeadRoomAlpha();
    FederationQueueWeight.checkHeadRoomAlphaValid(headRoomAlpha);

    // Step2, parse amRMPolicyWeights.
    Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight);
    LOG.debug("amRMPolicyWeights = {}.", amRMPolicyWeights);

    // Step3, parse routerPolicyWeights.
    Map<SubClusterIdInfo, Float> routerPolicyWeights = getSubClusterWeightMap(routerWeight);
    LOG.debug("routerWeights = {}.", amRMPolicyWeights);

    // Step4, Initialize WeightedPolicyInfo.
    WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo();
    weightedPolicyInfo.setHeadroomAlpha(Float.parseFloat(headRoomAlpha));
    weightedPolicyInfo.setAMRMPolicyWeights(amRMPolicyWeights);
    weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights);

    // Step5, Set SubClusterPolicyConfiguration.
    SubClusterPolicyConfiguration policyConfiguration =
        SubClusterPolicyConfiguration.newInstance(queue, policyManagerClassName,
        weightedPolicyInfo.toByteBuffer());
    federationFacade.setPolicyConfiguration(policyConfiguration);
  }

  /**
   * Get the Map of SubClusterWeight.
   *
   * This method can parse the Weight information of Router and
   * the Weight information of AMRMProxy.
   *
   * An example of a parsed string is as follows:
   * SC-1:0.7,SC-2:0.3
   *
   * @param policyWeight policyWeight.
   * @return Map of SubClusterWeight.
   * @throws YarnException exceptions from yarn servers.
   */
  protected Map<SubClusterIdInfo, Float> getSubClusterWeightMap(String policyWeight)
      throws YarnException {
    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(policyWeight);
    Map<SubClusterIdInfo, Float> result = new HashMap<>();
    String[] policyWeights = policyWeight.split(COMMA);
    for (String policyWeightItem : policyWeights) {
      String[] subClusterWeight = policyWeightItem.split(COLON);
      String subClusterId = subClusterWeight[0];
      SubClusterIdInfo subClusterIdInfo = new SubClusterIdInfo(subClusterId);
      String weight = subClusterWeight[1];
      result.put(subClusterIdInfo, Float.valueOf(weight));
    }
    return result;
  }

  /**
   * deregisterSubCluster by SubClusterId.
   *
   * @param reqSubClusterId subClusterId.
   * @throws YarnException indicates exceptions from yarn servers.
   */
  private DeregisterSubClusters deregisterSubCluster(String reqSubClusterId) {

    DeregisterSubClusters deregisterSubClusters;

    try {
      // Step1. Get subCluster information.
      SubClusterId subClusterId = SubClusterId.newInstance(reqSubClusterId);
      SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
      SubClusterState subClusterState = subClusterInfo.getState();
      long lastHeartBeat = subClusterInfo.getLastHeartBeat();
      Date lastHeartBeatDate = new Date(lastHeartBeat);
      deregisterSubClusters = DeregisterSubClusters.newInstance(
          reqSubClusterId, "NONE", lastHeartBeatDate.toString(),
          "Normal Heartbeat", subClusterState.name());

      // Step2. Deregister subCluster.
      if (subClusterState.isUsable()) {
        LOG.warn("Deregister SubCluster {} in State {} last heartbeat at {}.",
            subClusterId, subClusterState, lastHeartBeatDate);
        // heartbeat interval time.
        long heartBearTimeInterval = Time.now() - lastHeartBeat;
        if (heartBearTimeInterval - heartbeatExpirationMillis < 0) {
          boolean deregisterSubClusterFlag =
              federationFacade.deregisterSubCluster(subClusterId, SubClusterState.SC_LOST);
          if (deregisterSubClusterFlag) {
            deregisterSubClusters.setDeregisterState("SUCCESS");
            deregisterSubClusters.setSubClusterState("SC_LOST");
            deregisterSubClusters.setInformation("Heartbeat Time >= " +
                heartbeatExpirationMillis / (1000 * 60) + "minutes");
          } else {
            deregisterSubClusters.setDeregisterState("FAILED");
            deregisterSubClusters.setInformation("DeregisterSubClusters Failed.");
          }
        }
      } else {
        deregisterSubClusters.setDeregisterState("FAILED");
        deregisterSubClusters.setInformation("The subCluster is Unusable, " +
            "So it can't be Deregistered");
        LOG.warn("The SubCluster {} is Unusable (SubClusterState:{}), So it can't be Deregistered",
            subClusterId, subClusterState);
      }
      return deregisterSubClusters;
    } catch (YarnException e) {
      LOG.error("SubCluster {} DeregisterSubCluster Failed", reqSubClusterId, e);
      deregisterSubClusters = DeregisterSubClusters.newInstance(
          reqSubClusterId, "FAILED", "UNKNOWN", e.getMessage(), "UNKNOWN");
      return deregisterSubClusters;
    }
  }
}