FederationInterceptorREST.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.router.webapp;

import java.io.IOException;
import java.lang.reflect.Method;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.net.HttpHeaders;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntry;
import org.apache.hadoop.yarn.server.router.RouterAuditLogger;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationConfInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.dao.ConfInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import static org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade.getRandomActiveSubCluster;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_NEW_APP;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_CLUSTERINFO;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_CLUSTERUSERINFO;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_SCHEDULERINFO;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.DUMP_SCHEDULERLOGS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_ACTIVITIES;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_BULKACTIVITIES;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APPACTIVITIES;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APPSTATISTICS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_NODETOLABELS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_RMNODELABELS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_LABELSTONODES;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UNKNOWN;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.TARGET_WEB_SERVICE;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.REPLACE_LABELSONNODES;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.REPLACE_LABELSONNODE;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_CLUSTER_NODELABELS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.ADD_TO_CLUSTER_NODELABELS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.REMOVE_FROM_CLUSTERNODELABELS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_LABELS_ON_NODE;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_PRIORITY;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_QUEUEINFO;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UPDATE_APPLICATIONPRIORITY;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UPDATE_APP_QUEUE;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.POST_DELEGATION_TOKEN;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.POST_DELEGATION_TOKEN_EXPIRATION;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.CANCEL_DELEGATIONTOKEN;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_NEW_RESERVATION;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_RESERVATION;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UPDATE_RESERVATION;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.DELETE_RESERVATION;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.LIST_RESERVATIONS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_TIMEOUT;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_TIMEOUTS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UPDATE_APPLICATIONTIMEOUTS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APPLICATION_ATTEMPTS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.CHECK_USER_ACCESS_TO_QUEUE;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_ATTEMPT;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_CONTAINERS;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_CONTAINER;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UPDATE_SCHEDULER_CONFIGURATION;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_SCHEDULER_CONFIGURATION;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SIGNAL_TOCONTAINER;
import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.extractToken;
import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.getKerberosUserGroupInformation;

/**
 * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
 * implementation for federation of YARN RM and scaling an application across
 * multiple YARN SubClusters. All the federation specific implementation is
 * encapsulated in this class. This is always the last interceptor in the chain.
 */
public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {

  private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptorREST.class);

  private int numSubmitRetries;
  private FederationStateStoreFacade federationFacade;
  private RouterPolicyFacade policyFacade;
  private RouterMetrics routerMetrics;
  private final Clock clock = new MonotonicClock();
  private boolean returnPartialReport;
  private boolean appInfosCacheEnabled;
  private int appInfosCacheCount;
  private boolean allowPartialResult;
  private long submitIntervalTime;

  private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
  private LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appInfosCaches;

  /**
   * Thread pool used for asynchronous operations.
   */
  private ExecutorService threadpool;

  @Override
  public void init(String user) {

    super.init(user);

    federationFacade = FederationStateStoreFacade.getInstance(getConf());

    final Configuration conf = this.getConf();

    try {
      SubClusterResolver subClusterResolver =
          this.federationFacade.getSubClusterResolver();
      policyFacade = new RouterPolicyFacade(
          conf, federationFacade, subClusterResolver, null);
    } catch (FederationPolicyInitializationException e) {
      throw new YarnRuntimeException(e);
    }

    numSubmitRetries = conf.getInt(
        YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
        YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);

    interceptors = new HashMap<>();
    routerMetrics = RouterMetrics.getMetrics();
    threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder()
        .setNameFormat("FederationInterceptorREST #%d")
        .build());

    returnPartialReport = conf.getBoolean(
        YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
        YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);

    appInfosCacheEnabled = conf.getBoolean(
        YarnConfiguration.ROUTER_APPSINFO_ENABLED,
        YarnConfiguration.DEFAULT_ROUTER_APPSINFO_ENABLED);

    if(appInfosCacheEnabled) {
      appInfosCacheCount = conf.getInt(
          YarnConfiguration.ROUTER_APPSINFO_CACHED_COUNT,
          YarnConfiguration.DEFAULT_ROUTER_APPSINFO_CACHED_COUNT);
      appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
    }

    allowPartialResult = conf.getBoolean(
        YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED,
        YarnConfiguration.DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED);

    submitIntervalTime = conf.getTimeDuration(
        YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME,
        YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS);
  }

  @VisibleForTesting
  protected DefaultRequestInterceptorREST getInterceptorForSubCluster(SubClusterId subClusterId) {
    if (interceptors.containsKey(subClusterId)) {
      return interceptors.get(subClusterId);
    } else {
      LOG.error("The interceptor for SubCluster {} does not exist in the cache.",
          subClusterId);
      return null;
    }
  }

  private DefaultRequestInterceptorREST createInterceptorForSubCluster(
      SubClusterId subClusterId, String webAppAddress) {

    final Configuration conf = this.getConf();

    String interceptorClassName = conf.get(
        YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
        YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);

    DefaultRequestInterceptorREST interceptorInstance;
    try {
      Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
      if (DefaultRequestInterceptorREST.class.isAssignableFrom(interceptorClass)) {
        interceptorInstance =
            (DefaultRequestInterceptorREST) ReflectionUtils.newInstance(interceptorClass, conf);
        String userName = getUser().getUserName();
        interceptorInstance.init(userName);
      } else {
        throw new YarnRuntimeException("Class: " + interceptorClassName + " not instance of "
            + DefaultRequestInterceptorREST.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate ApplicationMasterRequestInterceptor: " +
          interceptorClassName, e);
    }

    String webAppAddressWithScheme = WebAppUtils.getHttpSchemePrefix(conf) + webAppAddress;
    interceptorInstance.setWebAppAddress(webAppAddressWithScheme);
    interceptorInstance.setSubClusterId(subClusterId);
    interceptors.put(subClusterId, interceptorInstance);
    return interceptorInstance;
  }

  protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster(
      SubClusterInfo subClusterInfo) {
    if (subClusterInfo != null) {
      final SubClusterId subClusterId = subClusterInfo.getSubClusterId();
      final String webServiceAddress = subClusterInfo.getRMWebServiceAddress();
      return getOrCreateInterceptorForSubCluster(subClusterId, webServiceAddress);
    }
    return null;
  }

  protected DefaultRequestInterceptorREST getOrCreateInterceptorByAppId(String appId)
      throws YarnException {
    // We first check the applicationId
    RouterServerUtil.validateApplicationId(appId);

    // Get homeSubCluster By appId
    SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
    LOG.info("appId = {} : subClusterInfo = {}.", appId, subClusterInfo.getSubClusterId());
    return getOrCreateInterceptorForSubCluster(subClusterInfo);
  }

  protected DefaultRequestInterceptorREST getOrCreateInterceptorByNodeId(String nodeId) {
    SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);
    return getOrCreateInterceptorForSubCluster(subClusterInfo);
  }

  @VisibleForTesting
  protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster(
      SubClusterId subClusterId, String webAppAddress) {
    DefaultRequestInterceptorREST interceptor = getInterceptorForSubCluster(subClusterId);
    String webAppAddressWithScheme =
        WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress;
    if (interceptor == null || !webAppAddressWithScheme.equals(interceptor.getWebAppAddress())) {
      interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress);
    }
    return interceptor;
  }

  /**
   * YARN Router forwards every getNewApplication requests to any RM. During
   * this operation there will be no communication with the State Store. The
   * Router will forward the requests to any SubCluster. The Router will retry
   * to submit the request on #numSubmitRetries different SubClusters. The
   * SubClusters are randomly chosen from the active ones.
   * <p>
   * Possible failures and behaviors:
   * <p>
   * Client: identical behavior as {@code RMWebServices}.
   * <p>
   * Router: the Client will timeout and resubmit.
   * <p>
   * ResourceManager: the Router will timeout and contacts another RM.
   * <p>
   * StateStore: not in the execution.
   */
  @Override
  public Response createNewApplication(HttpServletRequest hsr)
      throws AuthorizationException, IOException, InterruptedException {

    long startTime = clock.getTime();

    try {
      Map<SubClusterId, SubClusterInfo> subClustersActive =
          federationFacade.getSubClusters(true);

      // We declare blackList and retries.
      List<SubClusterId> blackList = new ArrayList<>();
      int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
      Response response = ((FederationActionRetry<Response>) (retryCount) ->
          invokeGetNewApplication(subClustersActive, blackList, hsr, retryCount)).
          runWithRetries(actualRetryNums, submitIntervalTime);

      // If the response is not empty and the status is SC_OK,
      // this request can be returned directly.
      if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
        long stopTime = clock.getTime();
        routerMetrics.succeededAppsCreated(stopTime - startTime);
        return response;
      }
    } catch (FederationPolicyException e) {
      // If a FederationPolicyException is thrown, the service is unavailable.
      routerMetrics.incrAppsFailedCreated();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NEW_APP, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
    } catch (Exception e) {
      routerMetrics.incrAppsFailedCreated();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NEW_APP, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
    }

    // return error message directly.
    String errMsg = "Fail to create a new application.";
    LOG.error(errMsg);
    routerMetrics.incrAppsFailedCreated();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NEW_APP, UNKNOWN,
        TARGET_WEB_SERVICE, errMsg);
    return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
  }

  /**
   * Invoke GetNewApplication to different subClusters.
   *
   * @param subClustersActive Active SubClusters.
   * @param blackList Blacklist avoid repeated calls to unavailable subCluster.
   * @param hsr HttpServletRequest.
   * @param retryCount number of retries.
   * @return Get response, If the response is empty or status not equal SC_OK, the request fails,
   * if the response is not empty and status equal SC_OK, the request is successful.
   * @throws YarnException yarn exception.
   * @throws IOException io error.
   * @throws InterruptedException interrupted exception.
   */
  private Response invokeGetNewApplication(Map<SubClusterId, SubClusterInfo> subClustersActive,
      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
      throws YarnException, IOException, InterruptedException {

    SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive, blackList);

    LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);

    DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId,
        subClustersActive.get(subClusterId).getRMWebServiceAddress());

    try {
      Response response = interceptor.createNewApplication(hsr);
      if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
        NewApplication entity = response.readEntity(NewApplication.class);
        ApplicationId applicationId = ApplicationId.fromString(entity.getApplicationId());
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_NEW_APP,
            TARGET_WEB_SERVICE, applicationId, subClusterId);
        return Response.status(Status.OK).entity(entity).build();
      }
    } catch (Exception e) {
      blackList.add(subClusterId);
      RouterServerUtil.logAndThrowException(e.getMessage(), e);
    }

    // We need to throw the exception directly.
    String msg = String.format("Unable to create a new ApplicationId in SubCluster %s.",
        subClusterId.getId());
    throw new YarnException(msg);
  }

  /**
   * Today, in YARN there are no checks of any applicationId submitted.
   * <p>
   * Base scenarios:
   * <p>
   * The Client submits an application to the Router. The Router selects one
   * SubCluster to forward the request. The Router inserts a tuple into
   * StateStore with the selected SubCluster (e.g. SC1) and the appId. The
   * State Store replies with the selected SubCluster (e.g. SC1). The Router
   * submits the request to the selected SubCluster.
   * <p>
   * In case of State Store failure:
   * <p>
   * The client submits an application to the Router. The Router selects one
   * SubCluster to forward the request. The Router inserts a tuple into State
   * Store with the selected SubCluster (e.g. SC1) and the appId. Due to the
   * State Store down the Router times out and it will retry depending on the
   * FederationFacade settings. The Router replies to the client with an error
   * message.
   * <p>
   * If State Store fails after inserting the tuple: identical behavior as
   * {@code RMWebServices}.
   * <p>
   * In case of Router failure:
   * <p>
   * Scenario 1 ��� Crash before submission to the ResourceManager
   * <p>
   * The Client submits an application to the Router. The Router selects one
   * SubCluster to forward the request. The Router inserts a tuple into State
   * Store with the selected SubCluster (e.g. SC1) and the appId. The Router
   * crashes. The Client timeouts and resubmits the application. The Router
   * selects one SubCluster to forward the request. The Router inserts a tuple
   * into State Store with the selected SubCluster (e.g. SC2) and the appId.
   * Because the tuple is already inserted in the State Store, it returns the
   * previous selected SubCluster (e.g. SC1). The Router submits the request
   * to the selected SubCluster (e.g. SC1).
   * <p>
   * Scenario 2 ��� Crash after submission to the ResourceManager
   * <p>
   * The Client submits an application to the Router. The Router selects one
   * SubCluster to forward the request. The Router inserts a tuple into State
   * Store with the selected SubCluster (e.g. SC1) and the appId. The Router
   * submits the request to the selected SubCluster. The Router crashes. The
   * Client timeouts and resubmit the application. The Router selects one
   * SubCluster to forward the request. The Router inserts a tuple into State
   * Store with the selected SubCluster (e.g. SC2) and the appId. The State
   * Store replies with the selected SubCluster (e.g. SC1). The Router submits
   * the request to the selected SubCluster (e.g. SC1). When a client re-submits
   * the same application to the same RM, it does not raise an exception and
   * replies with operation successful message.
   * <p>
   * In case of Client failure: identical behavior as {@code RMWebServices}.
   * <p>
   * In case of ResourceManager failure:
   * <p>
   * The Client submits an application to the Router. The Router selects one
   * SubCluster to forward the request. The Router inserts a tuple into State
   * Store with the selected SubCluster (e.g. SC1) and the appId. The Router
   * submits the request to the selected SubCluster. The entire SubCluster is
   * down ��� all the RMs in HA or the master RM is not reachable. The Router
   * times out. The Router selects a new SubCluster to forward the request.
   * The Router update a tuple into State Store with the selected SubCluster
   * (e.g. SC2) and the appId. The State Store replies with OK answer. The
   * Router submits the request to the selected SubCluster (e.g. SC2).
   */
  @Override
  public Response submitApplication(ApplicationSubmissionContextInfo newApp, HttpServletRequest hsr)
      throws AuthorizationException, IOException, InterruptedException {

    long startTime = clock.getTime();

    // We verify the parameters to ensure that newApp is not empty and
    // that the format of applicationId is correct.
    if (newApp == null || newApp.getApplicationId() == null) {
      routerMetrics.incrAppsFailedSubmitted();
      String errMsg = "Missing ApplicationSubmissionContextInfo or "
          + "applicationSubmissionContext information.";
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
          TARGET_WEB_SERVICE, errMsg);
      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
    }

    try {
      String applicationId = newApp.getApplicationId();
      RouterServerUtil.validateApplicationId(applicationId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrAppsFailedSubmitted();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
          TARGET_WEB_SERVICE, e.getMessage());
      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()).build();
    }

    List<SubClusterId> blackList = new ArrayList<>();
    try {
      int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
      int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
      Response response = ((FederationActionRetry<Response>) (retryCount) ->
          invokeSubmitApplication(newApp, blackList, hsr, retryCount)).
          runWithRetries(actualRetryNums, submitIntervalTime);
      if (response != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededAppsSubmitted(stopTime - startTime);
        return response;
      }
    } catch (Exception e) {
      routerMetrics.incrAppsFailedSubmitted();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
          TARGET_WEB_SERVICE, e.getMessage());
      return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
    }

    routerMetrics.incrAppsFailedSubmitted();
    String errMsg = String.format("Application %s with appId %s failed to be submitted.",
        newApp.getApplicationName(), newApp.getApplicationId());
    LOG.error(errMsg);
    RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
        TARGET_WEB_SERVICE, errMsg);
    return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
  }

  /**
   * Invoke SubmitApplication to different subClusters.
   *
   * @param submissionContext application submission context.
   * @param blackList Blacklist avoid repeated calls to unavailable subCluster.
   * @param hsr HttpServletRequest.
   * @param retryCount number of retries.
   * @return Get response, If the response is empty or status not equal SC_ACCEPTED,
   * the request fails, if the response is not empty and status equal SC_OK,
   * the request is successful.
   * @throws YarnException yarn exception.
   * @throws IOException io error.
   */
  private Response invokeSubmitApplication(ApplicationSubmissionContextInfo submissionContext,
      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
      throws YarnException, IOException, InterruptedException {

    // Step1. We convert ApplicationSubmissionContextInfo to ApplicationSubmissionContext
    // and Prepare parameters.
    ApplicationSubmissionContext context =
        RMWebAppUtil.createAppSubmissionContext(submissionContext, this.getConf());
    ApplicationId applicationId = ApplicationId.fromString(submissionContext.getApplicationId());
    SubClusterId subClusterId = null;

    try {
      // Get subClusterId from policy.
      subClusterId = policyFacade.getHomeSubcluster(context, blackList);

      // Print the log of submitting the submitApplication.
      LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
          applicationId, retryCount, subClusterId);

      // Step2. We Store the mapping relationship
      // between Application and HomeSubCluster in stateStore.
      ApplicationSubmissionContext trimmedAppSubmissionContext =
          RouterServerUtil.getTrimmedAppSubmissionContext(context);
      federationFacade.addOrUpdateApplicationHomeSubCluster(
          applicationId, subClusterId, retryCount, trimmedAppSubmissionContext);

      // Step3. We get subClusterInfo based on subClusterId.
      SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
      if (subClusterInfo == null) {
        throw new YarnException("Can't Find SubClusterId = " + subClusterId);
      }

      // Step4. Submit the request, if the response is HttpServletResponse.SC_ACCEPTED,
      // We return the response, otherwise we throw an exception.
      Response response = getOrCreateInterceptorForSubCluster(subClusterId,
          subClusterInfo.getRMWebServiceAddress()).submitApplication(submissionContext, hsr);
      if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
        LOG.info("Application {} with appId {} submitted on {}.",
            context.getApplicationName(), applicationId, subClusterId);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), SUBMIT_NEW_APP,
            TARGET_WEB_SERVICE, applicationId, subClusterId);
        return response;
      }
      String msg = String.format("application %s failed to be submitted.", applicationId);
      throw new YarnException(msg);
    } catch (Exception e) {
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
          TARGET_WEB_SERVICE, e.getMessage(), applicationId, subClusterId);
      LOG.warn("Unable to submit the application {} to SubCluster {}.", applicationId,
          subClusterId, e);
      if (subClusterId != null) {
        blackList.add(subClusterId);
      }
      throw e;
    }
  }

  /**
   * The YARN Router will forward to the respective YARN RM in which the AM is
   * running.
   * <p>
   * Possible failure:
   * <p>
   * Client: identical behavior as {@code RMWebServices}.
   * <p>
   * Router: the Client will timeout and resubmit the request.
   * <p>
   * ResourceManager: the Router will timeout and the call will fail.
   * <p>
   * State Store: the Router will timeout and it will retry depending on the
   * FederationFacade settings - if the failure happened before the select
   * operation.
   */
  @Override
  public AppInfo getApp(HttpServletRequest hsr, String appId, Set<String> unselectedFields) {

    try {
      long startTime = clock.getTime();

      // Get SubClusterInfo according to applicationId
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      if (interceptor == null) {
        routerMetrics.incrAppsFailedRetrieved();
        return null;
      }
      AppInfo response = interceptor.getApp(hsr, appId, unselectedFields);
      long stopTime = clock.getTime();
      routerMetrics.succeededAppsRetrieved(stopTime - startTime);
      return response;
    } catch (YarnException e) {
      routerMetrics.incrAppsFailedRetrieved();
      LOG.error("getApp Error, applicationId = {}.", appId, e);
      return null;
    } catch (IllegalArgumentException e) {
      routerMetrics.incrAppsFailedRetrieved();
      throw e;
    }
  }

  /**
   * The YARN Router will forward to the respective YARN RM in which the AM is
   * running.
   * <p>
   * Possible failures and behaviors:
   * <p>
   * Client: identical behavior as {@code RMWebServices}.
   * <p>
   * Router: the Client will timeout and resubmit the request.
   * <p>
   * ResourceManager: the Router will timeout and the call will fail.
   * <p>
   * State Store: the Router will timeout and it will retry depending on the
   * FederationFacade settings - if the failure happened before the select
   * operation.
   */
  @Override
  public Response updateAppState(AppState targetState, HttpServletRequest hsr, String appId)
      throws AuthorizationException, YarnException, InterruptedException, IOException {

    long startTime = clock.getTime();

    ApplicationId applicationId;
    try {
      applicationId = ApplicationId.fromString(appId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrAppsFailedKilled();
      return Response
          .status(Status.BAD_REQUEST)
          .entity(e.getLocalizedMessage())
          .build();
    }

    SubClusterInfo subClusterInfo;
    SubClusterId subClusterId;
    try {
      subClusterId =
          federationFacade.getApplicationHomeSubCluster(applicationId);
      subClusterInfo = federationFacade.getSubCluster(subClusterId);
    } catch (YarnException e) {
      routerMetrics.incrAppsFailedKilled();
      return Response
          .status(Status.BAD_REQUEST)
          .entity(e.getLocalizedMessage())
          .build();
    }

    Response response = getOrCreateInterceptorForSubCluster(subClusterId,
        subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
            hsr, appId);

    long stopTime = clock.getTime();
    routerMetrics.succeededAppsRetrieved(stopTime - startTime);

    return response;
  }

  /**
   * The YARN Router will forward the request to all the YARN RMs in parallel,
   * after that it will group all the ApplicationReports by the ApplicationId.
   * <p>
   * Possible failure:
   * <p>
   * Client: identical behavior as {@code RMWebServices}.
   * <p>
   * Router: the Client will timeout and resubmit the request.
   * <p>
   * ResourceManager: the Router calls each YARN RM in parallel by using one
   * thread for each YARN RM. In case a YARN RM fails, a single call will
   * timeout. However, the Router will merge the ApplicationReports it got, and
   * provides a partial list to the client.
   * <p>
   * State Store: the Router will timeout and it will retry depending on the
   * FederationFacade settings - if the failure happened before the select
   * operation.
   */
  @Override
  public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
      Set<String> statesQuery, String finalStatusQuery, String userQuery,
      String queueQuery, String count, String startedBegin, String startedEnd,
      String finishBegin, String finishEnd, Set<String> applicationTypes,
      Set<String> applicationTags, String name, Set<String> unselectedFields) {

    RouterAppInfoCacheKey routerAppInfoCacheKey = RouterAppInfoCacheKey.newInstance(
        hsr, stateQuery, statesQuery, finalStatusQuery, userQuery, queueQuery, count,
        startedBegin, startedEnd, finishBegin, finishEnd, applicationTypes,
        applicationTags, name, unselectedFields);

    if (appInfosCacheEnabled && routerAppInfoCacheKey != null) {
      if (appInfosCaches.containsKey(routerAppInfoCacheKey)) {
        return appInfosCaches.get(routerAppInfoCacheKey);
      }
    }

    AppsInfo apps = new AppsInfo();
    long startTime = clock.getTime();

    // HttpServletRequest does not work with ExecutorCompletionService.
    // Create a duplicate hsr.
    final HttpServletRequest hsrCopy = clone(hsr);
    Collection<SubClusterInfo> subClusterInfos = federationFacade.getActiveSubClusters();

    List<AppsInfo> appsInfos = subClusterInfos.parallelStream().map(subCluster -> {
      try {
        DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subCluster);
        AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, statesQuery, finalStatusQuery,
            userQuery, queueQuery, count, startedBegin, startedEnd, finishBegin, finishEnd,
            applicationTypes, applicationTags, name, unselectedFields);
        if (rmApps != null) {
          return rmApps;
        }
      } catch (Exception e) {
        LOG.warn("Failed to get application report.", e);
      }
      routerMetrics.incrMultipleAppsFailedRetrieved();
      LOG.error("Subcluster {} failed to return appReport.", subCluster.getSubClusterId());
      return null;
    }).collect(Collectors.toList());

    appsInfos.forEach(appsInfo -> {
      if (appsInfo != null) {
        apps.addAll(appsInfo.getApps());
        long stopTime = clock.getTime();
        routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
      }
    });

    if (apps.getApps().isEmpty()) {
      return new AppsInfo();
    }

    // Merge all the application reports got from all the available YARN RMs
    AppsInfo resultAppsInfo = RouterWebServiceUtil.mergeAppsInfo(
        apps.getApps(), returnPartialReport);

    if (appInfosCacheEnabled && routerAppInfoCacheKey != null) {
      appInfosCaches.put(routerAppInfoCacheKey, resultAppsInfo);
    }

    return resultAppsInfo;
  }

  /**
   * Get a copy of a HTTP request. This is for thread safety.
   * @param hsr HTTP servlet request to copy.
   * @return Copy of the HTTP request.
   */
  private HttpServletRequestWrapper clone(final HttpServletRequest hsr) {
    if (hsr == null) {
      return null;
    }

    final Map<String, String[]> parameterMap = hsr.getParameterMap();
    final String pathInfo = hsr.getPathInfo();
    final String user = hsr.getRemoteUser();
    final Principal principal = hsr.getUserPrincipal();
    final String mediaType = RouterWebServiceUtil.getMediaTypeFromHttpServletRequest(
        hsr, AppsInfo.class);
    return new HttpServletRequestWrapper(hsr) {
        public Map<String, String[]> getParameterMap() {
          return parameterMap;
        }
        public String getPathInfo() {
          return pathInfo;
        }
        public String getRemoteUser() {
          return user;
        }
        public Principal getUserPrincipal() {
          return principal;
        }
        public String getHeader(String value) {
          // we override only Accept
          if (value.equals(HttpHeaders.ACCEPT)) {
            return mediaType;
          }
          return null;
        }
      };
  }

  /**
   * Get the active subcluster in the federation.
   *
   * @param subClusterId subClusterId.
   * @return subClusterInfo.
   * @throws NotFoundException If the subclusters cannot be found.
   */
  private SubClusterInfo getActiveSubCluster(String subClusterId)
      throws NotFoundException {
    try {
      SubClusterId pSubClusterId = SubClusterId.newInstance(subClusterId);
      return federationFacade.getSubCluster(pSubClusterId);
    } catch (YarnException e) {
      throw new NotFoundException(e.getMessage());
    }
  }

  /**
   * The YARN Router will forward to the request to all the SubClusters to find
   * where the node is running.
   * <p>
   * Possible failure:
   * <p>
   * Client: identical behavior as {@code RMWebServices}.
   * <p>
   * Router: the Client will timeout and resubmit the request.
   * <p>
   * ResourceManager: the Router will timeout and the call will fail.
   * <p>
   * State Store: the Router will timeout and it will retry depending on the
   * FederationFacade settings - if the failure happened before the select
   * operation.
   */
  @Override
  public NodeInfo getNode(String nodeId) {

    final Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();

    if (subClustersActive.isEmpty()) {
      throw new NotFoundException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
    }

    final Map<SubClusterInfo, NodeInfo> results = getNode(subClustersActive, nodeId);

    // Collect the responses
    NodeInfo nodeInfo = null;
    for (NodeInfo nodeResponse : results.values()) {
      try {
        // Check if the node was already found in a different SubCluster and
        // it has an old health report
        if (nodeInfo == null || nodeInfo.getLastHealthUpdate() <
            nodeResponse.getLastHealthUpdate()) {
          nodeInfo = nodeResponse;
        }
      } catch (Throwable e) {
        LOG.warn("Failed to get node report ", e);
      }
    }

    if (nodeInfo == null) {
      throw new NotFoundException("nodeId, " + nodeId + ", is not found");
    }
    return nodeInfo;
  }

  /**
   * Get a node and the subcluster where it is.
   *
   * @param subClusters Subclusters where to search.
   * @param nodeId      Identifier of the node we are looking for.
   * @return Map between subcluster and node.
   */
  private Map<SubClusterInfo, NodeInfo> getNode(Collection<SubClusterInfo> subClusters,
      String nodeId) {

    // Parallel traversal of subClusters
    Stream<Pair<SubClusterInfo, NodeInfo>> pairStream = subClusters.parallelStream().map(
        subClusterInfo -> {
            final SubClusterId subClusterId = subClusterInfo.getSubClusterId();
            try {
              DefaultRequestInterceptorREST interceptor =
                   getOrCreateInterceptorForSubCluster(subClusterInfo);
              return Pair.of(subClusterInfo, interceptor.getNode(nodeId));
            } catch (Exception e) {
              LOG.error("Subcluster {} failed to return nodeInfo.", subClusterId, e);
              return null;
            }
        });

    // Collect the results
    final Map<SubClusterInfo, NodeInfo> results = new HashMap<>();
    pairStream.forEach(pair -> {
      if (pair != null) {
        SubClusterInfo subCluster = pair.getKey();
        NodeInfo nodeInfo = pair.getValue();
        results.put(subCluster, nodeInfo);
      }
    });

    return results;
  }

  /**
   * Get the subcluster a node belongs to.
   *
   * @param nodeId Identifier of the node we are looking for.
   * @return The subcluster containing the node.
   * @throws NotFoundException If the node cannot be found.
   */
  private SubClusterInfo getNodeSubcluster(String nodeId) throws NotFoundException {

    final Collection<SubClusterInfo> subClusters = federationFacade.getActiveSubClusters();
    final Map<SubClusterInfo, NodeInfo> results = getNode(subClusters, nodeId);

    SubClusterInfo subcluster = null;
    NodeInfo nodeInfo = null;
    for (Entry<SubClusterInfo, NodeInfo> entry : results.entrySet()) {
      NodeInfo nodeResponse = entry.getValue();
      if (nodeInfo == null || (nodeResponse != null &&
          nodeInfo.getLastHealthUpdate() < nodeResponse.getLastHealthUpdate())) {
        subcluster = entry.getKey();
        nodeInfo = nodeResponse;
      }
    }
    if (subcluster == null) {
      throw new NotFoundException("Cannot find " + nodeId + " in any subcluster");
    }
    return subcluster;
  }

  /**
   * The YARN Router will forward the request to all the YARN RMs in parallel,
   * after that it will remove all the duplicated NodeInfo by using the NodeId.
   * <p>
   * Possible failure:
   * <p>
   * Client: identical behavior as {@code RMWebServices}.
   * <p>
   * Router: the Client will timeout and resubmit the request.
   * <p>
   * ResourceManager: the Router calls each YARN RM in parallel by using one
   * thread for each YARN RM. In case a YARN RM fails, a single call will
   * timeout. However, the Router will use the NodesInfo it got, and provides a
   * partial list to the client.
   * <p>
   * State Store: the Router will timeout and it will retry depending on the
   * FederationFacade settings - if the failure happened before the select
   * operation.
   */
  @Override
  public NodesInfo getNodes(String states) {

    NodesInfo nodes = new NodesInfo();
    try {
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      Class[] argsClasses = new Class[]{String.class};
      Object[] args = new Object[]{states};
      ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args);
      Map<SubClusterInfo, NodesInfo> nodesMap =
          invokeConcurrent(subClustersActive, remoteMethod, NodesInfo.class);
      nodesMap.values().forEach(nodesInfo -> nodes.addAll(nodesInfo.getNodes()));
    } catch (NotFoundException e) {
      LOG.error("get all active sub cluster(s) error.", e);
      throw e;
    } catch (YarnException e) {
      LOG.error("getNodes by states = {} error.", states, e);
      throw new YarnRuntimeException(e);
    } catch (IOException e) {
      LOG.error("getNodes by states = {} error with io error.", states, e);
      throw new YarnRuntimeException(e);
    }

    // Delete duplicate from all the node reports got from all the available
    // YARN RMs. Nodes can be moved from one subclusters to another. In this
    // operation they result LOST/RUNNING in the previous SubCluster and
    // NEW/RUNNING in the new one.
    return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
  }

  /**
   * This method changes the resources of a specific node, and it is reachable
   * by using {@link RMWSConsts#NODE_RESOURCE}.
   *
   * @param hsr The servlet request.
   * @param nodeId The node we want to retrieve the information for.
   *               It is a PathParam.
   * @param resourceOption The resource change.
   * @return the resources of a specific node.
   */
  @Override
  public ResourceInfo updateNodeResource(HttpServletRequest hsr,
      String nodeId, ResourceOptionInfo resourceOption) {
    DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByNodeId(nodeId);
    return interceptor.updateNodeResource(hsr, nodeId, resourceOption);
  }

  @Override
  public ClusterMetricsInfo getClusterMetricsInfo() {
    ClusterMetricsInfo metrics = new ClusterMetricsInfo();

    Collection<SubClusterInfo> subClusterInfos = federationFacade.getActiveSubClusters();

    Stream<ClusterMetricsInfo> clusterMetricsInfoStream = subClusterInfos.parallelStream()
        .map(subClusterInfo -> {
          DefaultRequestInterceptorREST interceptor =
              getOrCreateInterceptorForSubCluster(subClusterInfo);
          try {
            return interceptor.getClusterMetricsInfo();
          } catch (Exception e) {
            LOG.error("Subcluster {} failed to return Cluster Metrics.",
                subClusterInfo.getSubClusterId());
            return null;
          }
        });

    clusterMetricsInfoStream.forEach(clusterMetricsInfo -> {
      try {
        if (clusterMetricsInfo != null) {
          RouterWebServiceUtil.mergeMetrics(metrics, clusterMetricsInfo);
        }
      } catch (Throwable e) {
        LOG.warn("Failed to get nodes report.", e);
      }
    });

    return metrics;
  }

  /**
   * The YARN Router will forward to the respective YARN RM in which the AM is
   * running.
   * <p>
   * Possible failure:
   * <p>
   * Client: identical behavior as {@code RMWebServices}.
   * <p>
   * Router: the Client will timeout and resubmit the request.
   * <p>
   * ResourceManager: the Router will timeout and the call will fail.
   * <p>
   * State Store: the Router will timeout and it will retry depending on the
   * FederationFacade settings - if the failure happened before the select
   * operation.
   */
  @Override
  public AppState getAppState(HttpServletRequest hsr, String appId)
      throws AuthorizationException {
    try {
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      if (interceptor != null) {
        return interceptor.getAppState(hsr, appId);
      }
    } catch (YarnException | IllegalArgumentException e) {
      LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e);
      return null;
    }
    return new AppState();
  }

  @Override
  public ClusterInfo get() {
    return getClusterInfo();
  }

  /**
   * This method retrieves the cluster information, and it is reachable by using
   * {@link RMWSConsts#INFO}.
   *
   * In Federation mode, we will return a FederationClusterInfo object,
   * which contains a set of ClusterInfo.
   *
   * @return the cluster information.
   */
  @Override
  public ClusterInfo getClusterInfo() {
    try {
      long startTime = Time.now();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      Class[] argsClasses = new Class[]{};
      Object[] args = new Object[]{};
      ClientMethod remoteMethod = new ClientMethod("getClusterInfo", argsClasses, args);
      Map<SubClusterInfo, ClusterInfo> subClusterInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, ClusterInfo.class);
      FederationClusterInfo federationClusterInfo = new FederationClusterInfo();
      subClusterInfoMap.forEach((subClusterInfo, clusterInfo) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        clusterInfo.setSubClusterId(subClusterId.getId());
        federationClusterInfo.getList().add(clusterInfo);
      });
      long stopTime = Time.now();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_CLUSTERINFO,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededGetClusterInfoRetrieved(stopTime - startTime);
      return federationClusterInfo;
    } catch (NotFoundException e) {
      routerMetrics.incrGetClusterInfoFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTERINFO, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
    } catch (YarnException | IOException e) {
      routerMetrics.incrGetClusterInfoFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTERINFO, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getClusterInfo error.", e);
    }
    routerMetrics.incrGetClusterInfoFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTERINFO, UNKNOWN,
        TARGET_WEB_SERVICE, "getClusterInfo error.");
    throw new RuntimeException("getClusterInfo error.");
  }

  /**
   * This method retrieves the cluster user information, and it is reachable by using
   * {@link RMWSConsts#CLUSTER_USER_INFO}.
   *
   * In Federation mode, we will return a ClusterUserInfo object,
   * which contains a set of ClusterUserInfo.
   *
   * @param hsr the servlet request
   * @return the cluster user information
   */
  @Override
  public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) {
    try {
      long startTime = Time.now();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class};
      Object[] args = new Object[]{hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("getClusterUserInfo", argsClasses, args);
      Map<SubClusterInfo, ClusterUserInfo> subClusterInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, ClusterUserInfo.class);
      FederationClusterUserInfo federationClusterUserInfo = new FederationClusterUserInfo();
      subClusterInfoMap.forEach((subClusterInfo, clusterUserInfo) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        clusterUserInfo.setSubClusterId(subClusterId.getId());
        federationClusterUserInfo.getList().add(clusterUserInfo);
      });
      long stopTime = Time.now();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_CLUSTERUSERINFO,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededGetClusterUserInfoRetrieved(stopTime - startTime);
      return federationClusterUserInfo;
    } catch (NotFoundException e) {
      routerMetrics.incrGetClusterUserInfoFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTERUSERINFO, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
    } catch (YarnException | IOException e) {
      routerMetrics.incrGetClusterUserInfoFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTERUSERINFO, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getClusterUserInfo error.", e);
    }
    routerMetrics.incrGetClusterUserInfoFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTERUSERINFO, UNKNOWN,
        TARGET_WEB_SERVICE, "getClusterUserInfo error.");
    throw new RuntimeException("getClusterUserInfo error.");
  }

  /**
   * This method retrieves the current scheduler status, and it is reachable by
   * using {@link RMWSConsts#SCHEDULER}.
   * For the federation mode, the SchedulerType information of the cluster
   * cannot be integrated and displayed, and the specific cluster information needs to be marked.
   *
   * @return the current scheduler status
   */
  @Override
  public SchedulerTypeInfo getSchedulerInfo() {
    try {
      long startTime = Time.now();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      Class[] argsClasses = new Class[]{};
      Object[] args = new Object[]{};
      ClientMethod remoteMethod = new ClientMethod("getSchedulerInfo", argsClasses, args);
      Map<SubClusterInfo, SchedulerTypeInfo> subClusterInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, SchedulerTypeInfo.class);
      FederationSchedulerTypeInfo federationSchedulerTypeInfo = new FederationSchedulerTypeInfo();
      subClusterInfoMap.forEach((subClusterInfo, schedulerTypeInfo) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        schedulerTypeInfo.setSubClusterId(subClusterId.getId());
        federationSchedulerTypeInfo.getList().add(schedulerTypeInfo);
      });
      long stopTime = Time.now();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_SCHEDULERINFO,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededGetSchedulerInfoRetrieved(stopTime - startTime);
      return federationSchedulerTypeInfo;
    } catch (NotFoundException e) {
      routerMetrics.incrGetSchedulerInfoFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_SCHEDULERINFO, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
    } catch (YarnException | IOException e) {
      routerMetrics.incrGetSchedulerInfoFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_SCHEDULERINFO, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getSchedulerInfo error.", e);
    }
    routerMetrics.incrGetSchedulerInfoFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_SCHEDULERINFO, UNKNOWN,
        TARGET_WEB_SERVICE, "getSchedulerInfo error.");
    throw new RuntimeException("getSchedulerInfo error.");
  }

  /**
   * This method dumps the scheduler logs for the time got in input, and it is
   * reachable by using {@link RMWSConsts#SCHEDULER_LOGS}.
   *
   * @param time the period of time. It is a FormParam.
   * @param hsr the servlet request
   * @return the result of the operation
   * @throws IOException when it cannot create dump log file
   */
  @Override
  public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
      throws IOException {

    // Step1. We will check the time parameter to
    // ensure that the time parameter is not empty and greater than 0.

    if (StringUtils.isBlank(time)) {
      routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DUMP_SCHEDULERLOGS, UNKNOWN,
          TARGET_WEB_SERVICE, "Parameter error, the time is empty or null.");
      throw new IllegalArgumentException("Parameter error, the time is empty or null.");
    }

    try {
      int period = Integer.parseInt(time);
      if (period <= 0) {
        throw new IllegalArgumentException("time must be greater than 0.");
      }
    } catch (NumberFormatException e) {
      routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DUMP_SCHEDULERLOGS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw new IllegalArgumentException("time must be a number.");
    } catch (IllegalArgumentException e) {
      routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DUMP_SCHEDULERLOGS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    // Step2. Call dumpSchedulerLogs of each subcluster.
    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{String.class, HttpServletRequest.class};
      Object[] args = new Object[]{time, hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("dumpSchedulerLogs", argsClasses, args);
      Map<SubClusterInfo, String> dumpSchedulerLogsMap = invokeConcurrent(
          subClustersActive, remoteMethod, String.class);
      StringBuilder stringBuilder = new StringBuilder();
      dumpSchedulerLogsMap.forEach((subClusterInfo, msg) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        stringBuilder.append("subClusterId")
            .append(subClusterId).append(" : ").append(msg).append("; ");
      });
      long stopTime = clock.getTime();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), DUMP_SCHEDULERLOGS,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededDumpSchedulerLogsRetrieved(stopTime - startTime);
      return stringBuilder.toString();
    } catch (IllegalArgumentException e) {
      routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DUMP_SCHEDULERLOGS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to dump SchedulerLogs by time: %s.", time);
    } catch (YarnException e) {
      routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DUMP_SCHEDULERLOGS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "dumpSchedulerLogs by time = %s error .", time);
    }

    routerMetrics.incrDumpSchedulerLogsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), DUMP_SCHEDULERLOGS, UNKNOWN,
        TARGET_WEB_SERVICE, "dumpSchedulerLogs Failed.");
    throw new RuntimeException("dumpSchedulerLogs Failed.");
  }

  /**
   * This method retrieve all the activities in a specific node, and it is
   * reachable by using {@link RMWSConsts#SCHEDULER_ACTIVITIES}.
   *
   * @param hsr the servlet request
   * @param nodeId the node we want to retrieve the activities. It is a
   *          QueryParam.
   * @param groupBy the groupBy type by which the activities should be
   *          aggregated. It is a QueryParam.
   * @return all the activities in the specific node
   */
  @Override
  public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
      String groupBy) {
    try {
      // Check the parameters to ensure that the parameters are not empty
      Validate.checkNotNullAndNotEmpty(nodeId, "nodeId");
      Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");

      // Query SubClusterInfo according to id,
      // if the nodeId cannot get SubClusterInfo, an exception will be thrown directly.
      // Call the corresponding subCluster to get ActivitiesInfo.
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByNodeId(nodeId);
      final HttpServletRequest hsrCopy = clone(hsr);
      ActivitiesInfo activitiesInfo = interceptor.getActivities(hsrCopy, nodeId, groupBy);
      if (activitiesInfo != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_ACTIVITIES,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededGetActivitiesLatencyRetrieved(stopTime - startTime);
        return activitiesInfo;
      }
    } catch (IllegalArgumentException | NotFoundException e) {
      routerMetrics.incrGetActivitiesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_ACTIVITIES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_ACTIVITIES, UNKNOWN,
        TARGET_WEB_SERVICE, "getActivities Failed.");
    routerMetrics.incrGetActivitiesFailedRetrieved();
    throw new RuntimeException("getActivities Failed.");
  }

  /**
   * This method retrieve the last n activities inside scheduler, and it is
   * reachable by using {@link RMWSConsts#SCHEDULER_BULK_ACTIVITIES}.
   *
   * @param hsr the servlet request
   * @param groupBy the groupBy type by which the activities should be
   *        aggregated. It is a QueryParam.
   * @param activitiesCount number of activities
   * @return last n activities
   */
  @Override
  public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
      String groupBy, int activitiesCount) throws InterruptedException {
    try {
      // Step1. Check the parameters to ensure that the parameters are not empty
      Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");
      Validate.checkNotNegative(activitiesCount, "activitiesCount");

      // Step2. Call the interface of subCluster concurrently and get the returned result.
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class, int.class};
      Object[] args = new Object[]{hsrCopy, groupBy, activitiesCount};
      ClientMethod remoteMethod = new ClientMethod("getBulkActivities", argsClasses, args);
      Map<SubClusterInfo, BulkActivitiesInfo> appStatisticsMap = invokeConcurrent(
          subClustersActive, remoteMethod, BulkActivitiesInfo.class);

      // Step3. Generate Federation objects and set subCluster information.
      long startTime = clock.getTime();
      FederationBulkActivitiesInfo fedBulkActivitiesInfo = new FederationBulkActivitiesInfo();
      appStatisticsMap.forEach((subClusterInfo, bulkActivitiesInfo) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        bulkActivitiesInfo.setSubClusterId(subClusterId.getId());
        fedBulkActivitiesInfo.getList().add(bulkActivitiesInfo);
      });
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_BULKACTIVITIES,
          TARGET_WEB_SERVICE);
      long stopTime = clock.getTime();
      routerMetrics.succeededGetBulkActivitiesRetrieved(stopTime - startTime);
      return fedBulkActivitiesInfo;
    } catch (IllegalArgumentException e) {
      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_BULKACTIVITIES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    } catch (NotFoundException e) {
      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_BULKACTIVITIES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
    } catch (IOException e) {
      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_BULKACTIVITIES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "getBulkActivities by groupBy = %s, activitiesCount = %s with io error.",
          groupBy, String.valueOf(activitiesCount));
    } catch (YarnException e) {
      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_BULKACTIVITIES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "getBulkActivities by groupBy = %s, activitiesCount = %s with yarn error.",
          groupBy, String.valueOf(activitiesCount));
    }

    routerMetrics.incrGetBulkActivitiesFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_BULKACTIVITIES, UNKNOWN,
        TARGET_WEB_SERVICE, "getBulkActivities Failed.");
    throw new RuntimeException("getBulkActivities Failed.");
  }

  @Override
  public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
      String appId, String time, Set<String> requestPriorities,
      Set<String> allocationRequestIds, String groupBy, String limit,
      Set<String> actions, boolean summarize) {

    try {
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      final HttpServletRequest hsrCopy = clone(hsr);
      AppActivitiesInfo appActivitiesInfo = interceptor.getAppActivities(hsrCopy, appId, time,
          requestPriorities, allocationRequestIds, groupBy, limit, actions, summarize);
      if (appActivitiesInfo != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetAppActivitiesRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_APPACTIVITIES,
            TARGET_WEB_SERVICE);
        return appActivitiesInfo;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrGetAppActivitiesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPACTIVITIES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to get subCluster by appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrGetAppActivitiesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPACTIVITIES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "getAppActivities by appId = %s error .", appId);
    }
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPACTIVITIES, UNKNOWN,
        TARGET_WEB_SERVICE, "getAppActivities Failed.");
    routerMetrics.incrGetAppActivitiesFailedRetrieved();
    throw new RuntimeException("getAppActivities Failed.");
  }

  @Override
  public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
      Set<String> stateQueries, Set<String> typeQueries) {
    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class};
      Object[] args = new Object[]{hsrCopy, stateQueries, typeQueries};
      ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args);
      Map<SubClusterInfo, ApplicationStatisticsInfo> appStatisticsMap = invokeConcurrent(
          subClustersActive, remoteMethod, ApplicationStatisticsInfo.class);
      ApplicationStatisticsInfo applicationStatisticsInfo  =
          RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values());
      if (applicationStatisticsInfo != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetAppStatisticsRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_APPSTATISTICS,
            TARGET_WEB_SERVICE);
        return applicationStatisticsInfo;
      }
    } catch (NotFoundException e) {
      routerMetrics.incrGetAppStatisticsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPSTATISTICS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
    } catch (IOException e) {
      routerMetrics.incrGetAppStatisticsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPSTATISTICS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "getAppStatistics error by stateQueries = %s, typeQueries = %s with io error.",
          StringUtils.join(stateQueries, ","), StringUtils.join(typeQueries, ","));
    } catch (YarnException e) {
      routerMetrics.incrGetAppStatisticsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPSTATISTICS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "getAppStatistics by stateQueries = %s, typeQueries = %s with yarn error.",
          StringUtils.join(stateQueries, ","), StringUtils.join(typeQueries, ","));
    }
    routerMetrics.incrGetAppStatisticsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPSTATISTICS, UNKNOWN,
        TARGET_WEB_SERVICE, "getAppStatistics Failed.");
    throw RouterServerUtil.logAndReturnRunTimeException(
        "getAppStatistics by stateQueries = %s, typeQueries = %s Failed.",
        StringUtils.join(stateQueries, ","), StringUtils.join(typeQueries, ","));
  }

  @Override
  public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
      throws IOException {
    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class};
      Object[] args = new Object[]{hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("getNodeToLabels", argsClasses, args);
      Map<SubClusterInfo, NodeToLabelsInfo> nodeToLabelsInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, NodeToLabelsInfo.class);
      NodeToLabelsInfo nodeToLabelsInfo =
          RouterWebServiceUtil.mergeNodeToLabels(nodeToLabelsInfoMap);
      if (nodeToLabelsInfo != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_NODETOLABELS,
            TARGET_WEB_SERVICE);
        return nodeToLabelsInfo;
      }
    } catch (NotFoundException e) {
      routerMetrics.incrNodeToLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NODETOLABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
    } catch (YarnException e) {
      routerMetrics.incrNodeToLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NODETOLABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("getNodeToLabels error.", e);
    }
    routerMetrics.incrNodeToLabelsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NODETOLABELS, UNKNOWN,
        TARGET_WEB_SERVICE, "getNodeToLabels Failed.");
    throw new RuntimeException("getNodeToLabels Failed.");
  }

  @Override
  public NodeLabelsInfo getRMNodeLabels(HttpServletRequest hsr) throws IOException {
    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class};
      Object[] args = new Object[]{hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("getRMNodeLabels", argsClasses, args);
      Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, NodeLabelsInfo.class);
      NodeLabelsInfo nodeToLabelsInfo =
          RouterWebServiceUtil.mergeNodeLabelsInfo(nodeToLabelsInfoMap);
      if (nodeToLabelsInfo != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetRMNodeLabelsRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_RMNODELABELS,
            TARGET_WEB_SERVICE);
        return nodeToLabelsInfo;
      }
    } catch (NotFoundException e) {
      routerMetrics.incrGetRMNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_RMNODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
    } catch (YarnException e) {
      routerMetrics.incrGetRMNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_RMNODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("getRMNodeLabels error.", e);
    }
    routerMetrics.incrGetRMNodeLabelsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_RMNODELABELS, UNKNOWN,
        TARGET_WEB_SERVICE, "getRMNodeLabels Failed.");
    throw new RuntimeException("getRMNodeLabels Failed.");
  }

  @Override
  public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
      throws IOException {
    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      Class[] argsClasses = new Class[]{Set.class};
      Object[] args = new Object[]{labels};
      ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", argsClasses, args);
      Map<SubClusterInfo, LabelsToNodesInfo> labelsToNodesInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, LabelsToNodesInfo.class);
      Map<NodeLabelInfo, NodeIDsInfo> labelToNodesMap = new HashMap<>();
      labelsToNodesInfoMap.values().forEach(labelsToNode -> {
        Map<NodeLabelInfo, NodeIDsInfo> values = labelsToNode.getLabelsToNodes();
        for (Map.Entry<NodeLabelInfo, NodeIDsInfo> item : values.entrySet()) {
          NodeLabelInfo key = item.getKey();
          NodeIDsInfo leftValue = item.getValue();
          NodeIDsInfo rightValue = labelToNodesMap.getOrDefault(key, null);
          NodeIDsInfo newValue = NodeIDsInfo.add(leftValue, rightValue);
          labelToNodesMap.put(key, newValue);
        }
      });
      LabelsToNodesInfo labelsToNodesInfo = new LabelsToNodesInfo(labelToNodesMap);
      if (labelsToNodesInfo != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_LABELSTONODES,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
        return labelsToNodesInfo;
      }
    } catch (NotFoundException e) {
      routerMetrics.incrLabelsToNodesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_LABELSTONODES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
    } catch (YarnException e) {
      routerMetrics.incrLabelsToNodesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_LABELSTONODES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException(
          e, "getLabelsToNodes by labels = %s with yarn error.", StringUtils.join(labels, ","));
    }
    routerMetrics.incrLabelsToNodesFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_LABELSTONODES, UNKNOWN,
        TARGET_WEB_SERVICE, "getLabelsToNodes Failed.");
    throw RouterServerUtil.logAndReturnRunTimeException(
        "getLabelsToNodes by labels = %s Failed.", StringUtils.join(labels, ","));
  }

  /**
   * This method replaces all the node labels for specific nodes, and it is
   * reachable by using {@link RMWSConsts#REPLACE_NODE_TO_LABELS}.
   *
   * @see ResourceManagerAdministrationProtocol#replaceLabelsOnNode
   * @param newNodeToLabels the list of new labels. It is a content param.
   * @param hsr the servlet request
   * @return Response containing the status code
   * @throws IOException if an exception happened
   */
  @Override
  public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
      HttpServletRequest hsr) throws IOException {

    // Step1. Check the parameters to ensure that the parameters are not empty.
    if (newNodeToLabels == null) {
      routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved();
      throw new IllegalArgumentException("Parameter error, newNodeToLabels must not be empty.");
    }
    List<NodeToLabelsEntry> nodeToLabelsEntries = newNodeToLabels.getNodeToLabels();
    if (CollectionUtils.isEmpty(nodeToLabelsEntries)) {
      routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REPLACE_LABELSONNODES, UNKNOWN,
          TARGET_WEB_SERVICE, "Parameter error, " +
          "nodeToLabelsEntries must not be empty.");
      throw new IllegalArgumentException("Parameter error, " +
         "nodeToLabelsEntries must not be empty.");
    }

    try {

      // Step2. We map the NodeId and NodeToLabelsEntry in the request.
      Map<String, NodeToLabelsEntry> nodeIdToLabels = new HashMap<>();
      newNodeToLabels.getNodeToLabels().forEach(nodeIdToLabel -> {
        String nodeId = nodeIdToLabel.getNodeId();
        nodeIdToLabels.put(nodeId, nodeIdToLabel);
      });

      // Step3. We map SubCluster with NodeToLabelsEntryList
      Map<SubClusterInfo, NodeToLabelsEntryList> subClusterToNodeToLabelsEntryList =
          new HashMap<>();
      nodeIdToLabels.forEach((nodeId, nodeToLabelsEntry) -> {
        SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);
        NodeToLabelsEntryList nodeToLabelsEntryList = subClusterToNodeToLabelsEntryList.
            getOrDefault(subClusterInfo, new NodeToLabelsEntryList());
        nodeToLabelsEntryList.getNodeToLabels().add(nodeToLabelsEntry);
        subClusterToNodeToLabelsEntryList.put(subClusterInfo, nodeToLabelsEntryList);
      });

      // Step4. Traverse the subCluster and call the replaceLabelsOnNodes interface.
      long startTime = clock.getTime();
      final HttpServletRequest hsrCopy = clone(hsr);
      StringBuilder builder = new StringBuilder();
      subClusterToNodeToLabelsEntryList.forEach((subClusterInfo, nodeToLabelsEntryList) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        try {
          DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
              subClusterInfo);
          interceptor.replaceLabelsOnNodes(nodeToLabelsEntryList, hsrCopy);
          builder.append("subCluster-").append(subClusterId.getId()).append(":Success,");
        } catch (Exception e) {
          LOG.error("replaceLabelsOnNodes Failed. subClusterId = {}.", subClusterId, e);
          builder.append("subCluster-").append(subClusterId.getId()).append(":Failed,");
        }
      });
      long stopTime = clock.getTime();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), REPLACE_LABELSONNODES,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededReplaceLabelsOnNodesRetrieved(stopTime - startTime);

      // Step5. return call result.
      return Response.status(Status.OK).entity(builder.toString()).build();
    } catch (Exception e) {
      routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REPLACE_LABELSONNODES, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }
  }

  /**
   * This method replaces all the node labels for specific node, and it is
   * reachable by using {@link RMWSConsts#NODES_NODEID_REPLACE_LABELS}.
   *
   * @see ResourceManagerAdministrationProtocol#replaceLabelsOnNode
   * @param newNodeLabelsName the list of new labels. It is a QueryParam.
   * @param hsr the servlet request
   * @param nodeId the node we want to replace the node labels. It is a
   *     PathParam.
   * @return Response containing the status code
   * @throws Exception if an exception happened
   */
  @Override
  public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
      HttpServletRequest hsr, String nodeId) throws Exception {

    // Step1. Check the parameters to ensure that the parameters are not empty.
    if (StringUtils.isBlank(nodeId)) {
      routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REPLACE_LABELSONNODE, UNKNOWN,
          TARGET_WEB_SERVICE, "Parameter error, nodeId must not be null or empty.");
      throw new IllegalArgumentException("Parameter error, nodeId must not be null or empty.");
    }
    if (CollectionUtils.isEmpty(newNodeLabelsName)) {
      routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REPLACE_LABELSONNODE, UNKNOWN,
          TARGET_WEB_SERVICE, "Parameter error, newNodeLabelsName must not be empty.");
      throw new IllegalArgumentException("Parameter error, newNodeLabelsName must not be empty.");
    }

    try {
      // Step2. We find the subCluster according to the nodeId,
      // and then call the replaceLabelsOnNode of the subCluster.
      long startTime = clock.getTime();
      SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByNodeId(nodeId);
      final HttpServletRequest hsrCopy = clone(hsr);
      interceptor.replaceLabelsOnNode(newNodeLabelsName, hsrCopy, nodeId);

      // Step3. Return the response result.
      long stopTime = clock.getTime();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), REPLACE_LABELSONNODE,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededReplaceLabelsOnNodeRetrieved(stopTime - startTime);
      String msg = "subCluster#" + subClusterInfo.getSubClusterId().getId() + ":Success;";
      return Response.status(Status.OK).entity(msg).build();
    } catch (Exception e) {
      routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REPLACE_LABELSONNODE, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }
  }

  @Override
  public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
      throws IOException {
    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class};
      Object[] args = new Object[]{hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels", argsClasses, args);
      Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, NodeLabelsInfo.class);
      Set<NodeLabel> hashSets = Sets.newHashSet();
      nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels()));
      NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(hashSets);
      if (nodeLabelsInfo != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_CLUSTER_NODELABELS,
            TARGET_WEB_SERVICE);
        return nodeLabelsInfo;
      }
    } catch (NotFoundException e) {
      routerMetrics.incrClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTER_NODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
    } catch (YarnException e) {
      routerMetrics.incrClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTER_NODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("getClusterNodeLabels with yarn error.", e);
    }
    routerMetrics.incrClusterNodeLabelsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CLUSTER_NODELABELS, UNKNOWN,
        TARGET_WEB_SERVICE, "getClusterNodeLabels Failed.");
    throw new RuntimeException("getClusterNodeLabels Failed.");
  }

  /**
   * This method adds specific node labels for specific nodes, and it is
   * reachable by using {@link RMWSConsts#ADD_NODE_LABELS}.
   *
   * @see ResourceManagerAdministrationProtocol#addToClusterNodeLabels
   * @param newNodeLabels the node labels to add. It is a content param.
   * @param hsr the servlet request
   * @return Response containing the status code
   * @throws Exception in case of bad request
   */
  @Override
  public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
      HttpServletRequest hsr) throws Exception {

    if (newNodeLabels == null) {
      routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), ADD_TO_CLUSTER_NODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, "Parameter error, the newNodeLabels is null.");
      throw new IllegalArgumentException("Parameter error, the newNodeLabels is null.");
    }

    List<NodeLabelInfo> nodeLabelInfos = newNodeLabels.getNodeLabelsInfo();
    if (CollectionUtils.isEmpty(nodeLabelInfos)) {
      routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), ADD_TO_CLUSTER_NODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, "Parameter error, the nodeLabelsInfo is null or empty.");
      throw new IllegalArgumentException("Parameter error, the nodeLabelsInfo is null or empty.");
    }

    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActives = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{NodeLabelsInfo.class, HttpServletRequest.class};
      Object[] args = new Object[]{newNodeLabels, hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("addToClusterNodeLabels", argsClasses, args);
      Map<SubClusterInfo, Response> responseInfoMap =
          invokeConcurrent(subClustersActives, remoteMethod, Response.class);
      StringBuilder buffer = new StringBuilder();
      // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS
      responseInfoMap.forEach((subClusterInfo, response) ->
          buildAppendMsg(subClusterInfo, buffer, response));
      long stopTime = clock.getTime();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), ADD_TO_CLUSTER_NODELABELS,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededAddToClusterNodeLabelsRetrieved((stopTime - startTime));
      return Response.status(Status.OK).entity(buffer.toString()).build();
    } catch (NotFoundException e) {
      routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), ADD_TO_CLUSTER_NODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
    } catch (YarnException e) {
      routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), ADD_TO_CLUSTER_NODELABELS, UNKNOWN,
          TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("addToClusterNodeLabels with yarn error.", e);
    }

    routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), ADD_TO_CLUSTER_NODELABELS, UNKNOWN,
        TARGET_WEB_SERVICE, "addToClusterNodeLabels Failed.");
    throw new RuntimeException("addToClusterNodeLabels Failed.");
  }

  /**
   * This method removes all the node labels for specific nodes, and it is
   * reachable by using {@link RMWSConsts#REMOVE_NODE_LABELS}.
   *
   * @see ResourceManagerAdministrationProtocol#removeFromClusterNodeLabels
   * @param oldNodeLabels the node labels to remove. It is a QueryParam.
   * @param hsr the servlet request
   * @return Response containing the status code
   * @throws Exception in case of bad request
   */
  @Override
  public Response removeFromClusterNodeLabels(Set<String> oldNodeLabels,
      HttpServletRequest hsr) throws Exception {

    if (CollectionUtils.isEmpty(oldNodeLabels)) {
      routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REMOVE_FROM_CLUSTERNODELABELS,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the oldNodeLabels is null or empty.");
      throw new IllegalArgumentException("Parameter error, the oldNodeLabels is null or empty.");
    }

    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActives = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{Set.class, HttpServletRequest.class};
      Object[] args = new Object[]{oldNodeLabels, hsrCopy};
      ClientMethod remoteMethod =
          new ClientMethod("removeFromClusterNodeLabels", argsClasses, args);
      Map<SubClusterInfo, Response> responseInfoMap =
          invokeConcurrent(subClustersActives, remoteMethod, Response.class);
      StringBuilder buffer = new StringBuilder();
      // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS
      responseInfoMap.forEach((subClusterInfo, response) ->
          buildAppendMsg(subClusterInfo, buffer, response));
      long stopTime = clock.getTime();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), REMOVE_FROM_CLUSTERNODELABELS,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime);
      return Response.status(Status.OK).entity(buffer.toString()).build();
    } catch (NotFoundException e) {
      routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REMOVE_FROM_CLUSTERNODELABELS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
    } catch (YarnException e) {
      routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), REMOVE_FROM_CLUSTERNODELABELS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("removeFromClusterNodeLabels with yarn error.", e);
    }

    routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
    throw new RuntimeException("removeFromClusterNodeLabels Failed.");
  }

  /**
   * Build Append information.
   *
   * @param subClusterInfo subCluster information.
   * @param buffer StringBuilder.
   * @param response response message.
   */
  private void buildAppendMsg(SubClusterInfo subClusterInfo, StringBuilder buffer,
      Response response) {
    SubClusterId subClusterId = subClusterInfo.getSubClusterId();
    String state = response != null &&
        (response.getStatus() == Status.OK.getStatusCode()) ? "SUCCESS" : "FAILED";
    buffer.append("SubCluster-")
        .append(subClusterId.getId())
        .append(":")
        .append(state)
        .append(",");
  }

  @Override
  public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
      throws IOException {
    try {
      long startTime = clock.getTime();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class};
      Object[] args = new Object[]{hsrCopy, nodeId};
      ClientMethod remoteMethod = new ClientMethod("getLabelsOnNode", argsClasses, args);
      Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, NodeLabelsInfo.class);
      Set<NodeLabel> hashSets = Sets.newHashSet();
      nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels()));
      NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(hashSets);
      if (nodeLabelsInfo != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_LABELS_ON_NODE,
            TARGET_WEB_SERVICE);
        return nodeLabelsInfo;
      }
    } catch (NotFoundException e) {
      routerMetrics.incrLabelsToNodesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_LABELS_ON_NODE,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
    } catch (YarnException e) {
      routerMetrics.incrLabelsToNodesFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_LABELS_ON_NODE,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowIOException(
          e, "getLabelsOnNode nodeId = %s with yarn error.", nodeId);
    }
    routerMetrics.incrLabelsToNodesFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_LABELS_ON_NODE,
        UNKNOWN, TARGET_WEB_SERVICE, "getLabelsOnNode by nodeId = " + nodeId + " Failed.");
    throw RouterServerUtil.logAndReturnRunTimeException(
        "getLabelsOnNode by nodeId = %s Failed.", nodeId);
  }

  @Override
  public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
      throws AuthorizationException {

    try {
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      AppPriority appPriority = interceptor.getAppPriority(hsr, appId);
      if (appPriority != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetAppPriorityRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_APP_PRIORITY,
            TARGET_WEB_SERVICE);
        return appPriority;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrGetAppPriorityFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_PRIORITY,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to get the getAppPriority appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrGetAppPriorityFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_PRIORITY,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getAppPriority error.", e);
    }
    routerMetrics.incrGetAppPriorityFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_PRIORITY,
        UNKNOWN, TARGET_WEB_SERVICE, "getAppPriority Failed.");
    throw new RuntimeException("getAppPriority Failed.");
  }

  @Override
  public Response updateApplicationPriority(AppPriority targetPriority,
      HttpServletRequest hsr, String appId) throws AuthorizationException,
      YarnException, InterruptedException, IOException {

    if (targetPriority == null) {
      routerMetrics.incrUpdateAppPriorityFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONPRIORITY,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the targetPriority is empty or null.");
      throw new IllegalArgumentException("Parameter error, the targetPriority is empty or null.");
    }

    try {
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      Response response = interceptor.updateApplicationPriority(targetPriority, hsr, appId);
      if (response != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), UPDATE_APPLICATIONPRIORITY,
            TARGET_WEB_SERVICE);
        return response;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrUpdateAppPriorityFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONPRIORITY,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to get the updateApplicationPriority appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrUpdateAppPriorityFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONPRIORITY,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("updateApplicationPriority error.", e);
    }
    RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONPRIORITY,
        UNKNOWN, TARGET_WEB_SERVICE, "getAppPriority Failed.");
    routerMetrics.incrUpdateAppPriorityFailedRetrieved();
    throw new RuntimeException("updateApplicationPriority Failed.");
  }

  @Override
  public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
      throws AuthorizationException {

    try {
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      AppQueue queue = interceptor.getAppQueue(hsr, appId);
      if (queue != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededGetAppQueueRetrieved((stopTime - startTime));
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_QUEUEINFO,
            TARGET_WEB_SERVICE);
        return queue;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrGetAppQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_QUEUEINFO,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e, "Unable to get queue by appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrGetAppQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_QUEUEINFO,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getAppQueue error.", e);
    }
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_QUEUEINFO,
        UNKNOWN, TARGET_WEB_SERVICE, "getAppQueue Failed.");
    routerMetrics.incrGetAppQueueFailedRetrieved();
    throw new RuntimeException("getAppQueue Failed.");
  }

  @Override
  public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
      String appId) throws AuthorizationException, YarnException,
      InterruptedException, IOException {

    if (targetQueue == null) {
      routerMetrics.incrUpdateAppQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APP_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the targetQueue is null.");
      throw new IllegalArgumentException("Parameter error, the targetQueue is null.");
    }

    try {
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      Response response = interceptor.updateAppQueue(targetQueue, hsr, appId);
      if (response != null) {
        long stopTime = clock.getTime();
        routerMetrics.succeededUpdateAppQueueRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), UPDATE_APP_QUEUE,
            TARGET_WEB_SERVICE);
        return response;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrUpdateAppQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APP_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to update app queue by appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrUpdateAppQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APP_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("updateAppQueue error.", e);
    }
    RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APP_QUEUE,
        UNKNOWN, TARGET_WEB_SERVICE, "updateAppQueue Failed.");
    routerMetrics.incrUpdateAppQueueFailedRetrieved();
    throw new RuntimeException("updateAppQueue Failed.");
  }

  /**
   * This method posts a delegation token from the client.
   *
   * @param tokenData the token to delegate. It is a content param.
   * @param hsr the servlet request.
   * @return Response containing the status code.
   * @throws AuthorizationException if Kerberos auth failed.
   * @throws IOException if the delegation failed.
   * @throws InterruptedException if interrupted.
   * @throws Exception in case of bad request.
   */
  @Override
  public Response postDelegationToken(DelegationToken tokenData, HttpServletRequest hsr)
      throws AuthorizationException, IOException, InterruptedException, Exception {

    if (tokenData == null || hsr == null) {
      RouterAuditLogger.logFailure(getUser().getShortUserName(), POST_DELEGATION_TOKEN,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the tokenData or hsr is null.");
      throw new IllegalArgumentException("Parameter error, the tokenData or hsr is null.");
    }

    try {
      // get Caller UserGroupInformation
      Configuration conf = federationFacade.getConf();
      UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);

      // create a delegation token
      return createDelegationToken(tokenData, callerUGI);
    } catch (YarnException e) {
      LOG.error("Create delegation token request failed.", e);
      RouterAuditLogger.logFailure(getUser().getShortUserName(), POST_DELEGATION_TOKEN,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
    }
  }

  /**
   * Create DelegationToken.
   *
   * @param dtoken DelegationToken Data.
   * @param callerUGI UserGroupInformation.
   * @return Response.
   * @throws Exception An exception occurred when creating a delegationToken.
   */
  private Response createDelegationToken(DelegationToken dtoken, UserGroupInformation callerUGI)
      throws IOException, InterruptedException {

    String renewer = dtoken.getRenewer();

    GetDelegationTokenResponse resp = callerUGI.doAs(
        (PrivilegedExceptionAction<GetDelegationTokenResponse>) () -> {
        GetDelegationTokenRequest createReq = GetDelegationTokenRequest.newInstance(renewer);
        return this.getRouterClientRMService().getDelegationToken(createReq);
      });

    DelegationToken respToken = getDelegationToken(renewer, resp);
    RouterAuditLogger.logSuccess(getUser().getShortUserName(), POST_DELEGATION_TOKEN,
        TARGET_WEB_SERVICE);
    return Response.status(Status.OK).entity(respToken).build();
  }

  /**
   * Get DelegationToken.
   *
   * @param renewer renewer.
   * @param resp GetDelegationTokenResponse.
   * @return DelegationToken.
   * @throws IOException if there are I/O errors.
   */
  private DelegationToken getDelegationToken(String renewer, GetDelegationTokenResponse resp)
      throws IOException {
    // Step1. Parse token from GetDelegationTokenResponse.
    Token<RMDelegationTokenIdentifier> tk = getToken(resp);
    String tokenKind = tk.getKind().toString();
    RMDelegationTokenIdentifier tokenIdentifier = tk.decodeIdentifier();
    String owner = tokenIdentifier.getOwner().toString();
    long maxDate = tokenIdentifier.getMaxDate();

    // Step2. Call the interface to get the expiration time of Token.
    RouterClientRMService clientRMService = this.getRouterClientRMService();
    RouterDelegationTokenSecretManager tokenSecretManager =
        clientRMService.getRouterDTSecretManager();
    long currentExpiration = tokenSecretManager.getRenewDate(tokenIdentifier);

    // Step3. Generate Delegation token.
    DelegationToken delegationToken = new DelegationToken(tk.encodeToUrlString(),
        renewer, owner, tokenKind, currentExpiration, maxDate);

    return delegationToken;
  }

  /**
   * GetToken.
   * We convert RMDelegationToken in GetDelegationTokenResponse to Token.
   *
   * @param resp GetDelegationTokenResponse.
   * @return Token.
   */
  private static Token<RMDelegationTokenIdentifier> getToken(GetDelegationTokenResponse resp) {
    org.apache.hadoop.yarn.api.records.Token token = resp.getRMDelegationToken();
    byte[] identifier = token.getIdentifier().array();
    byte[] password = token.getPassword().array();
    Text kind = new Text(token.getKind());
    Text service = new Text(token.getService());
    return new Token<>(identifier, password, kind, service);
  }

  /**
   * This method updates the expiration for a delegation token from the client.
   *
   * @param hsr the servlet request
   * @return Response containing the status code.
   * @throws AuthorizationException if Kerberos auth failed.
   * @throws IOException if the delegation failed.
   * @throws InterruptedException  if interrupted.
   * @throws Exception in case of bad request.
   */
  @Override
  public Response postDelegationTokenExpiration(HttpServletRequest hsr)
      throws AuthorizationException, IOException, InterruptedException, Exception {

    if (hsr == null) {
      RouterAuditLogger.logFailure(getUser().getShortUserName(), POST_DELEGATION_TOKEN_EXPIRATION,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the hsr is null.");
      throw new IllegalArgumentException("Parameter error, the hsr is null.");
    }

    try {
      // get Caller UserGroupInformation
      Configuration conf = federationFacade.getConf();
      UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);
      return renewDelegationToken(hsr, callerUGI);
    } catch (YarnException e) {
      LOG.error("Renew delegation token request failed.", e);
      RouterAuditLogger.logFailure(getUser().getShortUserName(), POST_DELEGATION_TOKEN_EXPIRATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
    }
  }

  /**
   * Renew DelegationToken.
   *
   * @param hsr HttpServletRequest.
   * @param callerUGI UserGroupInformation.
   * @return Response
   * @throws IOException if there are I/O errors.
   * @throws InterruptedException if any thread has interrupted.
   */
  private Response renewDelegationToken(HttpServletRequest hsr, UserGroupInformation callerUGI)
      throws IOException, InterruptedException {

    // renew Delegation Token
    DelegationToken tokenData = new DelegationToken();
    String encodeToken = extractToken(hsr).encodeToUrlString();
    tokenData.setToken(encodeToken);

    // Parse token data
    Token<RMDelegationTokenIdentifier> token = extractToken(tokenData.getToken());
    org.apache.hadoop.yarn.api.records.Token dToken =
        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
        token.getPassword(), token.getService().toString());

    // Renew token
    RenewDelegationTokenResponse resp = callerUGI.doAs(
        (PrivilegedExceptionAction<RenewDelegationTokenResponse>) () -> {
        RenewDelegationTokenRequest req = RenewDelegationTokenRequest.newInstance(dToken);
        return this.getRouterClientRMService().renewDelegationToken(req);
      });

    // return DelegationToken
    long renewTime = resp.getNextExpirationTime();
    DelegationToken respToken = new DelegationToken();
    respToken.setNextExpirationTime(renewTime);
    RouterAuditLogger.logSuccess(getUser().getShortUserName(), POST_DELEGATION_TOKEN_EXPIRATION,
        TARGET_WEB_SERVICE);
    return Response.status(Status.OK).entity(respToken).build();
  }

  /**
   * Cancel DelegationToken.
   *
   * @param hsr the servlet request
   * @return  Response containing the status code.
   * @throws AuthorizationException if Kerberos auth failed.
   * @throws IOException if the delegation failed.
   * @throws InterruptedException if interrupted.
   * @throws Exception in case of bad request.
   */
  @Override
  public Response cancelDelegationToken(HttpServletRequest hsr)
      throws AuthorizationException, IOException, InterruptedException, Exception {
    try {
      // get Caller UserGroupInformation
      Configuration conf = federationFacade.getConf();
      UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);

      // parse Token Data
      Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
      org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils
          .newDelegationToken(token.getIdentifier(), token.getKind().toString(),
          token.getPassword(), token.getService().toString());

      // cancelDelegationToken
      callerUGI.doAs((PrivilegedExceptionAction<CancelDelegationTokenResponse>) () -> {
        CancelDelegationTokenRequest req = CancelDelegationTokenRequest.newInstance(dToken);
        return this.getRouterClientRMService().cancelDelegationToken(req);
      });

      RouterAuditLogger.logSuccess(getUser().getShortUserName(), CANCEL_DELEGATIONTOKEN,
          TARGET_WEB_SERVICE);
      return Response.status(Status.OK).build();
    } catch (YarnException e) {
      LOG.error("Cancel delegation token request failed.", e);
      RouterAuditLogger.logFailure(getUser().getShortUserName(), CANCEL_DELEGATIONTOKEN,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
    }
  }

  @Override
  public Response createNewReservation(HttpServletRequest hsr)
      throws AuthorizationException, IOException, InterruptedException {
    long startTime = clock.getTime();
    try {
      Map<SubClusterId, SubClusterInfo> subClustersActive =
          federationFacade.getSubClusters(true);
      // We declare blackList and retries.
      List<SubClusterId> blackList = new ArrayList<>();
      int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
      Response response = ((FederationActionRetry<Response>) (retryCount) ->
          invokeCreateNewReservation(subClustersActive, blackList, hsr, retryCount)).
          runWithRetries(actualRetryNums, submitIntervalTime);
      // If the response is not empty and the status is SC_OK,
      // this request can be returned directly.
      if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_NEW_RESERVATION,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime);
        return response;
      }
    } catch (FederationPolicyException e) {
      // If a FederationPolicyException is thrown, the service is unavailable.
      routerMetrics.incrGetNewReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NEW_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
    } catch (Exception e) {
      routerMetrics.incrGetNewReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NEW_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
    }

    // return error message directly.
    String errMsg = "Fail to create a new reservation.";
    LOG.error(errMsg);
    routerMetrics.incrGetNewReservationFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NEW_RESERVATION,
        UNKNOWN, TARGET_WEB_SERVICE, errMsg);
    return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
  }

  private Response invokeCreateNewReservation(Map<SubClusterId, SubClusterInfo> subClustersActive,
      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
      throws YarnException {
    SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive, blackList);
    LOG.info("createNewReservation try #{} on SubCluster {}.", retryCount, subClusterId);
    SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId);
    DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
        subClusterId, subClusterInfo.getRMWebServiceAddress());
    try {
      Response response = interceptor.createNewReservation(hsr);
      if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
        return response;
      }
    } catch (Exception e) {
      blackList.add(subClusterId);
      RouterServerUtil.logAndThrowException(e.getMessage(), e);
    }
    // We need to throw the exception directly.
    String msg = String.format("Unable to create a new ReservationId in SubCluster %s.",
        subClusterId.getId());
    throw new YarnException(msg);
  }

  @Override
  public Response submitReservation(ReservationSubmissionRequestInfo resContext,
      HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
    long startTime = clock.getTime();
    if (resContext == null || resContext.getReservationId() == null
        || resContext.getReservationDefinition() == null || resContext.getQueue() == null) {
      routerMetrics.incrSubmitReservationFailedRetrieved();
      String errMsg = "Missing submitReservation resContext or reservationId " +
          "or reservation definition or queue.";
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, errMsg);
      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
    }

    // Check that the resId format is accurate
    String resId = resContext.getReservationId();
    try {
      RouterServerUtil.validateReservationId(resId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrSubmitReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    List<SubClusterId> blackList = new ArrayList<>();
    try {
      int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
      int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
      Response response = ((FederationActionRetry<Response>) (retryCount) ->
          invokeSubmitReservation(resContext, blackList, hsr, retryCount)).
          runWithRetries(actualRetryNums, submitIntervalTime);
      if (response != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), SUBMIT_RESERVATION,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededSubmitReservationRetrieved(stopTime - startTime);
        return response;
      }
    } catch (Exception e) {
      routerMetrics.incrSubmitReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
    }

    routerMetrics.incrSubmitReservationFailedRetrieved();
    String msg = String.format("Reservation %s failed to be submitted.", resId);
    RouterAuditLogger.logFailure(getUser().getShortUserName(), SUBMIT_RESERVATION,
        UNKNOWN, TARGET_WEB_SERVICE, msg);
    return Response.status(Status.SERVICE_UNAVAILABLE).entity(msg).build();
  }

  private Response invokeSubmitReservation(ReservationSubmissionRequestInfo requestContext,
      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
      throws YarnException, IOException, InterruptedException {
    String resId = requestContext.getReservationId();
    ReservationId reservationId = ReservationId.parseReservationId(resId);
    ReservationDefinitionInfo definitionInfo = requestContext.getReservationDefinition();
    ReservationDefinition definition =
         RouterServerUtil.convertReservationDefinition(definitionInfo);

    // First, Get SubClusterId according to specific strategy.
    ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
        definition, requestContext.getQueue(), reservationId);
    SubClusterId subClusterId = null;

    try {
      // Get subClusterId from policy.
      subClusterId = policyFacade.getReservationHomeSubCluster(request);

      // Print the log of submitting the submitApplication.
      LOG.info("submitReservation ReservationId {} try #{} on SubCluster {}.", reservationId,
          retryCount, subClusterId);

      // Step2. We Store the mapping relationship
      // between Application and HomeSubCluster in stateStore.
      federationFacade.addOrUpdateReservationHomeSubCluster(reservationId,
          subClusterId, retryCount);

      // Step3. We get subClusterInfo based on subClusterId.
      SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);

      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
      HttpServletRequest hsrCopy = clone(hsr);
      Response response = interceptor.submitReservation(requestContext, hsrCopy);
      if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
        LOG.info("Reservation {} submitted on subCluster {}.", reservationId, subClusterId);
        return response;
      }
      String msg = String.format("application %s failed to be submitted.", resId);
      throw new YarnException(msg);
    } catch (Exception e) {
      LOG.warn("Unable to submit the reservation {} to SubCluster {}.", resId,
          subClusterId, e);
      if (subClusterId != null) {
        blackList.add(subClusterId);
      }
      throw e;
    }
  }

  @Override
  public Response updateReservation(ReservationUpdateRequestInfo resContext,
      HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {

    // parameter verification
    if (resContext == null || resContext.getReservationId() == null
        || resContext.getReservationDefinition() == null) {
      routerMetrics.incrUpdateReservationFailedRetrieved();
      String errMsg = "Missing updateReservation resContext or reservationId " +
          "or reservation definition.";
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, errMsg);
      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
    }

    // get reservationId
    String reservationId = resContext.getReservationId();

    // Check that the reservationId format is accurate
    try {
      RouterServerUtil.validateReservationId(reservationId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrUpdateReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    try {
      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
      HttpServletRequest hsrCopy = clone(hsr);
      Response response = interceptor.updateReservation(resContext, hsrCopy);
      if (response != null) {
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), UPDATE_RESERVATION,
            TARGET_WEB_SERVICE);
        return response;
      }
    } catch (Exception e) {
      routerMetrics.incrUpdateReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("updateReservation Failed.", e);
    }

    // throw an exception
    routerMetrics.incrUpdateReservationFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_RESERVATION,
        UNKNOWN, TARGET_WEB_SERVICE, "updateReservation Failed, reservationId = " + reservationId);
    throw new YarnRuntimeException("updateReservation Failed, reservationId = " + reservationId);
  }

  @Override
  public Response deleteReservation(ReservationDeleteRequestInfo resContext,
      HttpServletRequest hsr)
      throws AuthorizationException, IOException, InterruptedException {

    // parameter verification
    if (resContext == null || resContext.getReservationId() == null) {
      routerMetrics.incrDeleteReservationFailedRetrieved();
      String errMsg = "Missing deleteReservation request or reservationId.";
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DELETE_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, errMsg);
      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
    }

    // get ReservationId
    String reservationId = resContext.getReservationId();

    // Check that the reservationId format is accurate
    try {
      RouterServerUtil.validateReservationId(reservationId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrDeleteReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DELETE_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    try {
      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
      HttpServletRequest hsrCopy = clone(hsr);
      Response response = interceptor.deleteReservation(resContext, hsrCopy);
      if (response != null) {
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), DELETE_RESERVATION,
            TARGET_WEB_SERVICE);
        return response;
      }
    } catch (Exception e) {
      routerMetrics.incrDeleteReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), DELETE_RESERVATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("deleteReservation Failed.", e);
    }

    // throw an exception
    routerMetrics.incrDeleteReservationFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), DELETE_RESERVATION,
        UNKNOWN, TARGET_WEB_SERVICE, "deleteReservation Failed, reservationId = " + reservationId);
    throw new YarnRuntimeException("deleteReservation Failed, reservationId = " + reservationId);
  }

  @Override
  public Response listReservation(String queue, String reservationId,
      long startTime, long endTime, boolean includeResourceAllocations,
      HttpServletRequest hsr) throws Exception {

    if (queue == null || queue.isEmpty()) {
      routerMetrics.incrListReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), LIST_RESERVATIONS,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the queue is empty or null.");
      throw new IllegalArgumentException("Parameter error, the queue is empty or null.");
    }

    if (reservationId == null || reservationId.isEmpty()) {
      routerMetrics.incrListReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), LIST_RESERVATIONS,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the reservationId is empty or null.");
      throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
    }

    // Check that the reservationId format is accurate
    try {
      RouterServerUtil.validateReservationId(reservationId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrListReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), LIST_RESERVATIONS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    try {
      long startTime1 = clock.getTime();
      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
      HttpServletRequest hsrCopy = clone(hsr);
      Response response = interceptor.listReservation(queue, reservationId, startTime, endTime,
          includeResourceAllocations, hsrCopy);
      if (response != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), LIST_RESERVATIONS,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededListReservationRetrieved(stopTime - startTime1);
        return response;
      }
    } catch (YarnException e) {
      routerMetrics.incrListReservationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), LIST_RESERVATIONS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("listReservation error.", e);
    }

    routerMetrics.incrListReservationFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), LIST_RESERVATIONS,
        UNKNOWN, TARGET_WEB_SERVICE, "listReservation Failed.");
    throw new YarnException("listReservation Failed.");
  }

  @Override
  public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
      String type) throws AuthorizationException {

    if (type == null || type.isEmpty()) {
      routerMetrics.incrGetAppTimeoutFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_TIMEOUT,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the type is empty or null.");
      throw new IllegalArgumentException("Parameter error, the type is empty or null.");
    }

    try {
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      AppTimeoutInfo appTimeoutInfo = interceptor.getAppTimeout(hsr, appId, type);
      if (appTimeoutInfo != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_APP_TIMEOUT,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededGetAppTimeoutRetrieved((stopTime - startTime));
        return appTimeoutInfo;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrGetAppTimeoutFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_TIMEOUT,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to get the getAppTimeout appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrGetAppTimeoutFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_TIMEOUT,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getAppTimeout error.", e);
    }
    routerMetrics.incrGetAppTimeoutFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_TIMEOUT,
        UNKNOWN, TARGET_WEB_SERVICE, "getAppTimeout Failed.");
    throw new RuntimeException("getAppTimeout Failed.");
  }

  @Override
  public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
      throws AuthorizationException {

    try {
      long startTime = clock.getTime();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      AppTimeoutsInfo appTimeoutsInfo = interceptor.getAppTimeouts(hsr, appId);
      if (appTimeoutsInfo != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_APP_TIMEOUTS,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededGetAppTimeoutsRetrieved((stopTime - startTime));
        return appTimeoutsInfo;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrGetAppTimeoutsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_TIMEOUTS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to get the getAppTimeouts appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrGetAppTimeoutsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_TIMEOUTS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getAppTimeouts error.", e);
    }

    routerMetrics.incrGetAppTimeoutsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_TIMEOUTS,
        UNKNOWN, TARGET_WEB_SERVICE, "getAppTimeouts Failed.");
    throw new RuntimeException("getAppTimeouts Failed.");
  }

  @Override
  public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
      HttpServletRequest hsr, String appId) throws AuthorizationException,
      YarnException, InterruptedException, IOException {

    if (appTimeout == null) {
      routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONTIMEOUTS,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the appTimeout is null.");
      throw new IllegalArgumentException("Parameter error, the appTimeout is null.");
    }

    try {
      long startTime = Time.now();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      Response response = interceptor.updateApplicationTimeout(appTimeout, hsr, appId);
      if (response != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), UPDATE_APPLICATIONTIMEOUTS,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededUpdateAppTimeoutsRetrieved((stopTime - startTime));
        return response;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONTIMEOUTS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to get the updateApplicationTimeout appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONTIMEOUTS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("updateApplicationTimeout error.", e);
    }

    routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_APPLICATIONTIMEOUTS,
        UNKNOWN, TARGET_WEB_SERVICE, "updateApplicationTimeout Failed.");
    throw new RuntimeException("updateApplicationTimeout Failed.");
  }

  @Override
  public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {

    try {
      long startTime = Time.now();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(hsr, appId);
      if (appAttemptsInfo != null) {
        long stopTime = Time.now();
        routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_APPLICATION_ATTEMPTS,
            TARGET_WEB_SERVICE);
        return appAttemptsInfo;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrAppAttemptsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPLICATION_ATTEMPTS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to get the AppAttempt appId: %s.", appId);
    } catch (YarnException e) {
      routerMetrics.incrAppAttemptsFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPLICATION_ATTEMPTS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getAppAttempts error.", e);
    }

    routerMetrics.incrAppAttemptsFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APPLICATION_ATTEMPTS,
        UNKNOWN, TARGET_WEB_SERVICE, "getAppAttempts Failed.");
    throw new RuntimeException("getAppAttempts Failed.");
  }

  @Override
  public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
      String queueAclType, HttpServletRequest hsr) throws AuthorizationException {

    // Parameter Verification
    if (queue == null || queue.isEmpty()) {
      routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), CHECK_USER_ACCESS_TO_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the queue is empty or null.");
      throw new IllegalArgumentException("Parameter error, the queue is empty or null.");
    }

    if (username == null || username.isEmpty()) {
      routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), CHECK_USER_ACCESS_TO_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the username is empty or null.");
      throw new IllegalArgumentException("Parameter error, the username is empty or null.");
    }

    if (queueAclType == null || queueAclType.isEmpty()) {
      routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), CHECK_USER_ACCESS_TO_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the queueAclType is empty or null.");
      throw new IllegalArgumentException("Parameter error, the queueAclType is empty or null.");
    }

    // Traverse SubCluster and call checkUserAccessToQueue Api
    try {
      long startTime = Time.now();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{String.class, String.class, String.class,
          HttpServletRequest.class};
      Object[] args = new Object[]{queue, username, queueAclType, hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("checkUserAccessToQueue", argsClasses, args);
      Map<SubClusterInfo, RMQueueAclInfo> rmQueueAclInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, RMQueueAclInfo.class);
      FederationRMQueueAclInfo aclInfo = new FederationRMQueueAclInfo();
      rmQueueAclInfoMap.forEach((subClusterInfo, rMQueueAclInfo) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        rMQueueAclInfo.setSubClusterId(subClusterId.getId());
        aclInfo.getList().add(rMQueueAclInfo);
      });
      long stopTime = Time.now();
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), CHECK_USER_ACCESS_TO_QUEUE,
          TARGET_WEB_SERVICE);
      routerMetrics.succeededCheckUserAccessToQueueRetrieved(stopTime - startTime);
      return aclInfo;
    } catch (NotFoundException e) {
      routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), CHECK_USER_ACCESS_TO_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
    } catch (YarnException | IOException e) {
      routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), CHECK_USER_ACCESS_TO_QUEUE,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("checkUserAccessToQueue error.", e);
    }

    routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), CHECK_USER_ACCESS_TO_QUEUE,
        UNKNOWN, TARGET_WEB_SERVICE, "checkUserAccessToQueue error.");
    throw new RuntimeException("checkUserAccessToQueue error.");
  }

  @Override
  public AppAttemptInfo getAppAttempt(HttpServletRequest req,
      HttpServletResponse res, String appId, String appAttemptId) {

    // Check that the appId/appAttemptId format is accurate
    try {
      RouterServerUtil.validateApplicationAttemptId(appAttemptId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrAppAttemptReportFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_ATTEMPT,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    // Call the getAppAttempt method
    try {
      long startTime = Time.now();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      AppAttemptInfo appAttemptInfo = interceptor.getAppAttempt(req, res, appId, appAttemptId);
      if (appAttemptInfo != null) {
        long stopTime = Time.now();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_APP_ATTEMPT,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededAppAttemptReportRetrieved(stopTime - startTime);
        return appAttemptInfo;
      }
    } catch (IllegalArgumentException e) {
      routerMetrics.incrAppAttemptReportFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_ATTEMPT,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Unable to getAppAttempt by appId: %s, appAttemptId: %s.", appId, appAttemptId);
    } catch (YarnException e) {
      routerMetrics.incrAppAttemptReportFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_ATTEMPT,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "getAppAttempt error, appId: %s, appAttemptId: %s.", appId, appAttemptId);
    }

    routerMetrics.incrAppAttemptReportFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_APP_ATTEMPT,
        UNKNOWN, TARGET_WEB_SERVICE, "getAppAttempt failed.");
    throw RouterServerUtil.logAndReturnRunTimeException(
        "getAppAttempt failed, appId: %s, appAttemptId: %s.", appId, appAttemptId);
  }

  @Override
  public ContainersInfo getContainers(HttpServletRequest req,
      HttpServletResponse res, String appId, String appAttemptId) {

    // Check that the appId/appAttemptId format is accurate
    try {
      RouterServerUtil.validateApplicationId(appId);
      RouterServerUtil.validateApplicationAttemptId(appAttemptId);
    } catch (IllegalArgumentException e) {
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINERS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      routerMetrics.incrGetContainersFailedRetrieved();
      throw e;
    }

    try {
      long startTime = clock.getTime();
      ContainersInfo containersInfo = new ContainersInfo();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      Class[] argsClasses = new Class[]{
          HttpServletRequest.class, HttpServletResponse.class, String.class, String.class};
      Object[] args = new Object[]{req, res, appId, appAttemptId};
      ClientMethod remoteMethod = new ClientMethod("getContainers", argsClasses, args);
      Map<SubClusterInfo, ContainersInfo> containersInfoMap =
          invokeConcurrent(subClustersActive, remoteMethod, ContainersInfo.class);
      if (containersInfoMap != null && !containersInfoMap.isEmpty()) {
        containersInfoMap.values().forEach(containers ->
            containersInfo.addAll(containers.getContainers()));
      }
      if (containersInfo != null) {
        long stopTime = clock.getTime();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_CONTAINERS,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededGetContainersRetrieved(stopTime - startTime);
        return containersInfo;
      }
    } catch (NotFoundException e) {
      routerMetrics.incrGetContainersFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINERS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e, "getContainers error, appId = %s, " +
          " appAttemptId = %s, Probably getActiveSubclusters error.", appId, appAttemptId);
    } catch (IOException | YarnException e) {
      routerMetrics.incrGetContainersFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINERS,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e, "getContainers error, appId = %s, " +
          " appAttemptId = %s.", appId, appAttemptId);
    }

    routerMetrics.incrGetContainersFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINERS,
        UNKNOWN, TARGET_WEB_SERVICE, "getContainers failed.");
    throw RouterServerUtil.logAndReturnRunTimeException(
        "getContainers failed, appId: %s, appAttemptId: %s.", appId, appAttemptId);
  }

  @Override
  public ContainerInfo getContainer(HttpServletRequest req,
      HttpServletResponse res, String appId, String appAttemptId,
      String containerId) {

    // FederationInterceptorREST#getContainer is logically
    // the same as FederationClientInterceptor#getContainerReport,
    // so use the same Metric.

    // Check that the appId/appAttemptId/containerId format is accurate
    try {
      RouterServerUtil.validateApplicationAttemptId(appAttemptId);
      RouterServerUtil.validateContainerId(containerId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrGetContainerReportFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINER,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    try {
      long startTime = Time.now();
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId);
      ContainerInfo containerInfo =
          interceptor.getContainer(req, res, appId, appAttemptId, containerId);
      if (containerInfo != null) {
        long stopTime = Time.now();
        routerMetrics.succeededGetContainerReportRetrieved(stopTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_CONTAINER,
            TARGET_WEB_SERVICE);
        return containerInfo;
      }
    } catch (IllegalArgumentException e) {
      String msg = String.format(
          "Unable to get the AppAttempt appId: %s, appAttemptId: %s, containerId: %s.", appId,
          appAttemptId, containerId);
      routerMetrics.incrGetContainerReportFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINER,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(msg, e);
    } catch (YarnException e) {
      routerMetrics.incrGetContainerReportFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINER,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
    }

    routerMetrics.incrGetContainerReportFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_CONTAINER,
        UNKNOWN, TARGET_WEB_SERVICE, "getContainer Failed.");
    throw new RuntimeException("getContainer Failed.");
  }

  /**
   * This method updates the Scheduler configuration, and it is reachable by
   * using {@link RMWSConsts#SCHEDULER_CONF}.
   *
   * @param mutationInfo th information for making scheduler configuration
   *        changes (supports adding, removing, or updating a queue, as well
   *        as global scheduler conf changes)
   * @param hsr the servlet request
   * @return Response containing the status code
   * @throws AuthorizationException if the user is not authorized to invoke this
   *         method
   * @throws InterruptedException if interrupted
   */
  @Override
  public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
      HttpServletRequest hsr) throws AuthorizationException, InterruptedException {

    // Make Sure mutationInfo is not null.
    if (mutationInfo == null) {
      routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_SCHEDULER_CONFIGURATION,
          UNKNOWN, TARGET_WEB_SERVICE,
          "Parameter error, the schedConfUpdateInfo is empty or null.");
      throw new IllegalArgumentException(
          "Parameter error, the schedConfUpdateInfo is empty or null.");
    }

    // In federated mode, we may have a mix of multiple schedulers.
    // In order to ensure accurate update scheduler configuration,
    // we need users to explicitly set subClusterId.
    String pSubClusterId = mutationInfo.getSubClusterId();
    if (StringUtils.isBlank(pSubClusterId)) {
      routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_SCHEDULER_CONFIGURATION,
          UNKNOWN, TARGET_WEB_SERVICE,
          "Parameter error, the subClusterId is empty or null.");
      throw new IllegalArgumentException("Parameter error, " +
          "the subClusterId is empty or null.");
    }

    // Get the subClusterInfo , then update the scheduler configuration.
    try {
      long startTime = clock.getTime();
      SubClusterInfo subClusterInfo = getActiveSubCluster(pSubClusterId);
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
      Response response = interceptor.updateSchedulerConfiguration(mutationInfo, hsr);
      if (response != null) {
        long endTime = clock.getTime();
        routerMetrics.succeededUpdateSchedulerConfigurationRetrieved(endTime - startTime);
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), UPDATE_SCHEDULER_CONFIGURATION,
            TARGET_WEB_SERVICE);
        return Response.status(response.getStatus()).entity(response.getEntity()).build();
      }
    } catch (NotFoundException e) {
      routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_SCHEDULER_CONFIGURATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "Get subCluster error. subClusterId = %s", pSubClusterId);
    } catch (Exception e) {
      routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_SCHEDULER_CONFIGURATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException(e,
          "UpdateSchedulerConfiguration error. subClusterId = %s", pSubClusterId);
    }

    routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), UPDATE_SCHEDULER_CONFIGURATION,
        UNKNOWN, TARGET_WEB_SERVICE, "UpdateSchedulerConfiguration Failed.");
    throw new RuntimeException("UpdateSchedulerConfiguration error. subClusterId = "
        + pSubClusterId);
  }

  /**
   * This method retrieves all the Scheduler configuration, and it is reachable
   * by using {@link RMWSConsts#SCHEDULER_CONF}.
   *
   * @param hsr the servlet request
   * @return Response containing the status code
   * @throws AuthorizationException if the user is not authorized to invoke this
   *      method.
   */
  @Override
  public Response getSchedulerConfiguration(HttpServletRequest hsr)
      throws AuthorizationException {
    try {
      long startTime = clock.getTime();
      FederationConfInfo federationConfInfo = new FederationConfInfo();
      Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
      final HttpServletRequest hsrCopy = clone(hsr);
      Class[] argsClasses = new Class[]{HttpServletRequest.class};
      Object[] args = new Object[]{hsrCopy};
      ClientMethod remoteMethod = new ClientMethod("getSchedulerConfiguration", argsClasses, args);
      Map<SubClusterInfo, Response> responseMap =
          invokeConcurrent(subClustersActive, remoteMethod, Response.class);
      responseMap.forEach((subClusterInfo, response) -> {
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        if (response == null) {
          String errorMsg = subClusterId + " Can't getSchedulerConfiguration.";
          federationConfInfo.getErrorMsgs().add(errorMsg);
        } else if (response.getStatus() == Status.BAD_REQUEST.getStatusCode()) {
          String errorMsg = String.valueOf(response.getEntity());
          federationConfInfo.getErrorMsgs().add(errorMsg);
        } else if (response.getStatus() == Status.OK.getStatusCode()) {
          ConfInfo fedConfInfo = (ConfInfo) response.getEntity();
          fedConfInfo.setSubClusterId(subClusterId.getId());
          federationConfInfo.getList().add(fedConfInfo);
        }
      });
      long endTime = clock.getTime();
      routerMetrics.succeededGetSchedulerConfigurationRetrieved(endTime - startTime);
      RouterAuditLogger.logSuccess(getUser().getShortUserName(), GET_SCHEDULER_CONFIGURATION,
          TARGET_WEB_SERVICE);
      return Response.status(Status.OK).entity(federationConfInfo).build();
    } catch (NotFoundException e) {
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_SCHEDULER_CONFIGURATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
      RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
    } catch (Exception e) {
      RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_SCHEDULER_CONFIGURATION,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
      return Response.status(Status.BAD_REQUEST).entity("getSchedulerConfiguration error.").build();
    }

    routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_SCHEDULER_CONFIGURATION,
        UNKNOWN, TARGET_WEB_SERVICE, "getSchedulerConfiguration Failed.");
    throw new RuntimeException("getSchedulerConfiguration Failed.");
  }

  @Override
  public void setNextInterceptor(RESTRequestInterceptor next) {
    throw new YarnRuntimeException("setNextInterceptor is being called on "
        + "FederationInterceptorREST, which should be the last one "
        + "in the chain. Check if the interceptor pipeline configuration "
        + "is correct");
  }

  @Override
  public Response signalToContainer(String containerId, String command,
      HttpServletRequest req) {

    // Check if containerId is empty or null
    try {
      RouterServerUtil.validateContainerId(containerId);
    } catch (IllegalArgumentException e) {
      routerMetrics.incrSignalToContainerFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SIGNAL_TOCONTAINER,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      throw e;
    }

    // Check if command is empty or null
    if (command == null || command.isEmpty()) {
      routerMetrics.incrSignalToContainerFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SIGNAL_TOCONTAINER,
          UNKNOWN, TARGET_WEB_SERVICE, "Parameter error, the command is empty or null.");
      throw new IllegalArgumentException("Parameter error, the command is empty or null.");
    }

    try {
      long startTime = Time.now();

      ContainerId containerIdObj = ContainerId.fromString(containerId);
      ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();
      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId.toString());
      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());

      Response response = interceptor.signalToContainer(containerId, command, req);
      if (response != null) {
        long stopTime = Time.now();
        RouterAuditLogger.logSuccess(getUser().getShortUserName(), SIGNAL_TOCONTAINER,
            TARGET_WEB_SERVICE);
        routerMetrics.succeededSignalToContainerRetrieved(stopTime - startTime);
        return response;
      }
    } catch (YarnException e) {
      routerMetrics.incrSignalToContainerFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SIGNAL_TOCONTAINER,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("signalToContainer Failed.", e);
    } catch (AuthorizationException e) {
      routerMetrics.incrSignalToContainerFailedRetrieved();
      RouterAuditLogger.logFailure(getUser().getShortUserName(), SIGNAL_TOCONTAINER,
          UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage());
      RouterServerUtil.logAndThrowRunTimeException("signalToContainer Author Failed.", e);
    }

    routerMetrics.incrSignalToContainerFailedRetrieved();
    RouterAuditLogger.logFailure(getUser().getShortUserName(), SIGNAL_TOCONTAINER,
        UNKNOWN, TARGET_WEB_SERVICE, "signalToContainer Failed.");
    throw new RuntimeException("signalToContainer Failed.");
  }

  @Override
  public void shutdown() {
    if (threadpool != null) {
      threadpool.shutdown();
    }
  }

  private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> clusterIds,
      ClientMethod request, Class<R> clazz) throws YarnException {

    Map<SubClusterInfo, R> results = new HashMap<>();

    // If there is a sub-cluster access error,
    // we should choose whether to throw exception information according to user configuration.
    // Send the requests in parallel.
    CompletionService<SubClusterResult<R>> compSvc = new ExecutorCompletionService<>(threadpool);

    // This part of the code should be able to expose the accessed Exception information.
    // We use Pair to store related information. The left value of the Pair is the response,
    // and the right value is the exception.
    // If the request is normal, the response is not empty and the exception is empty;
    // if the request is abnormal, the response is empty and the exception is not empty.
    for (final SubClusterInfo info : clusterIds) {
      compSvc.submit(() -> {
        DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
            info.getSubClusterId(), info.getRMWebServiceAddress());
        try {
          Method method = DefaultRequestInterceptorREST.class.
              getMethod(request.getMethodName(), request.getTypes());
          Object retObj = method.invoke(interceptor, request.getParams());
          R ret = clazz.cast(retObj);
          return new SubClusterResult<>(info, ret, null);
        } catch (Exception e) {
          LOG.error("SubCluster {} failed to call {} method.",
              info.getSubClusterId(), request.getMethodName(), e);
          Throwable cause = e.getCause();
          if (cause instanceof YarnException) {
            return new SubClusterResult<>(info, null, (YarnException) cause);
          }
          if (cause instanceof IllegalArgumentException) {
            return new SubClusterResult<>(info, null, (IllegalArgumentException) cause);
          }
          if(cause instanceof ForbiddenException) {
            return new SubClusterResult<>(info, null, (ForbiddenException) cause);
          }
          return new SubClusterResult<>(info, null, e);
        }
      });
    }

    Exception lastException = null;
    for (int i = 0; i < clusterIds.size(); i++) {
      SubClusterInfo subClusterInfo = null;
      try {
        Future<SubClusterResult<R>> future = compSvc.take();
        SubClusterResult<R> result = future.get();
        subClusterInfo = result.getSubClusterInfo();

        R response = result.getResponse();
        if (response != null) {
          results.put(subClusterInfo, response);
        }

        Exception exception = result.getException();
        if (exception != null) {
          lastException = exception;
          throw exception;
        }
      } catch (Throwable e) {
        String subClusterId = subClusterInfo != null ?
            subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
        LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
        // If allowPartialResult=false, it means that if an exception occurs in a subCluster,
        // an exception will be thrown directly.
        if (!allowPartialResult) {
          throw new YarnException("SubCluster " + subClusterId +
              " failed to " + request.getMethodName() + " report.", e);
        }
      }
    }

    if (results.isEmpty() && lastException != null) {
      Throwable cause = lastException.getCause();
      if (cause != null) {
        throw new YarnRuntimeException(cause.getMessage());
      }
      throw new YarnRuntimeException(lastException.getMessage());
    }
    return results;
  }

  /**
   * get the HomeSubCluster according to ApplicationId.
   *
   * @param appId applicationId
   * @return HomeSubCluster
   * @throws YarnException on failure
   */
  private SubClusterInfo getHomeSubClusterInfoByAppId(String appId)
      throws YarnException {

    if (StringUtils.isBlank(appId)) {
      throw new IllegalArgumentException("applicationId can't null or empty.");
    }

    try {
      ApplicationId applicationId = ApplicationId.fromString(appId);
      SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
      if (subClusterId == null) {
        RouterServerUtil.logAndThrowException(null,
            "Can't get HomeSubCluster by applicationId %s", applicationId);
      }
      return federationFacade.getSubCluster(subClusterId);
    } catch (IllegalArgumentException e){
      throw new IllegalArgumentException(e);
    } catch (YarnException e) {
      RouterServerUtil.logAndThrowException(e,
          "Get HomeSubClusterInfo by applicationId %s failed.", appId);
    }
    throw new YarnException("Unable to get subCluster by applicationId = " + appId);
  }

  /**
   * get the HomeSubCluster according to ReservationId.
   *
   * @param resId reservationId
   * @return HomeSubCluster
   * @throws YarnException on failure
   */
  private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
      throws YarnException {
    try {
      ReservationId reservationId = ReservationId.parseReservationId(resId);
      SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId);
      if (subClusterId == null) {
        RouterServerUtil.logAndThrowException(null,
            "Can't get HomeSubCluster by reservationId %s", resId);
      }
      return federationFacade.getSubCluster(subClusterId);
    } catch (YarnException | IOException e) {
      RouterServerUtil.logAndThrowException(e,
          "Get HomeSubClusterInfo by reservationId %s failed.", resId);
    }
    throw new YarnException("Unable to get subCluster by reservationId = " + resId);
  }

  @VisibleForTesting
  public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
    return appInfosCaches;
  }

  @VisibleForTesting
  public Map<SubClusterId, DefaultRequestInterceptorREST> getInterceptors() {
    return interceptors;
  }

  public void setAllowPartialResult(boolean allowPartialResult) {
    this.allowPartialResult = allowPartialResult;
  }

  @VisibleForTesting
  public Map<SubClusterInfo, NodesInfo> invokeConcurrentGetNodeLabel()
      throws IOException, YarnException {
    Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters();
    Class[] argsClasses = new Class[]{String.class};
    Object[] args = new Object[]{null};
    ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args);
    return invokeConcurrent(subClustersActive, remoteMethod, NodesInfo.class);
  }
}