ZookeeperFederationStateStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.yarn.server.federation.store.impl;

import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.Comparator;
import java.util.stream.Collectors;

import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationHomeSubClusterProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.ApplicationHomeSubClusterPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;

import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;
import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE;
import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;

/**
 * ZooKeeper implementation of {@link FederationStateStore}.
 * The znode structure is as follows:
 *
 * ROOT_DIR_PATH
 * |--- MEMBERSHIP
 * |     |----- SC1
 * |     |----- SC2
 * |--- APPLICATION
 * |     |----- HIERARCHIES
 * |     |        |----- 1
 * |     |        |      |----- (#ApplicationId barring last character)
 * |     |        |      |       |       |----- APP Data
 * |     |        |      ....
 * |     |        |
 * |     |        |----- 2
 * |     |        |      |----- (#ApplicationId barring last 2 characters)
 * |     |        |      |       |----- (#Last 2 characters of ApplicationId)
 * |     |        |      |       |       |----- APP Data
 * |--- POLICY
 * |     |----- QUEUE1
 * |     |----- QUEUE1
 * |--- RESERVATION
 * |     |----- RESERVATION1
 * |     |----- RESERVATION2
 * |--- ROUTER_RM_DT_SECRET_MANAGER_ROOT
 * |     |----- ROUTER_RM_DELEGATION_TOKENS_ROOT
 * |     |       |----- RM_DELEGATION_TOKEN_1
 * |     |       |----- RM_DELEGATION_TOKEN_2
 * |     |       |----- RM_DELEGATION_TOKEN_3
 * |     |----- ROUTER_RM_DT_MASTER_KEYS_ROOT
 * |     |       |----- DELEGATION_KEY_1
 * |     |----- ROUTER_RM_DT_SEQUENTIAL_NUMBER
 */
public class ZookeeperFederationStateStore implements FederationStateStore {

  private static final Logger LOG =
      LoggerFactory.getLogger(ZookeeperFederationStateStore.class);

  private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
  private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
  private final static String ROOT_ZNODE_NAME_POLICY = "policies";
  private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";

  protected static final String ROOT_ZNODE_NAME_VERSION = "version";

  /** Store Delegation Token Node. */
  private final static String ROUTER_RM_DT_SECRET_MANAGER_ROOT = "router_rm_dt_secret_manager_root";
  private static final String ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
      "router_rm_dt_master_keys_root";
  private static final String ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
      "router_rm_delegation_tokens_root";
  private static final String ROUTER_RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
      "router_rm_dt_sequential_number";
  private static final String ROUTER_RM_DT_MASTER_KEY_ID_ZNODE_NAME =
      "router_rm_dt_master_key_id";
  private static final String ROUTER_RM_DELEGATION_KEY_PREFIX = "delegation_key_";
  private static final String ROUTER_RM_DELEGATION_TOKEN_PREFIX = "rm_delegation_token_";

  /** Interface to Zookeeper. */
  private ZKCuratorManager zkManager;

  /** Store sequenceNum. **/
  private int seqNumBatchSize;
  private int currentSeqNum;
  private int currentMaxSeqNum;
  private SharedCount delTokSeqCounter;
  private SharedCount keyIdSeqCounter;

  /** Directory to store the state store data. */
  private String baseZNode;

  private String appsZNode;
  private String membershipZNode;
  private String policiesZNode;
  private String reservationsZNode;
  private String versionNode;
  private int maxAppsInStateStore;

  /** Directory to store the delegation token data. **/
  private Map<Integer, String> routerAppRootHierarchies;
  private String routerRMDTSecretManagerRoot;
  private String routerRMDTMasterKeysRootPath;
  private String routerRMDelegationTokensRootPath;
  private String routerRMSequenceNumberPath;
  private String routerRMMasterKeyIdPath;

  private int appIdNodeSplitIndex = 0;
  private final static int HIERARCHIES_LEVEL = 4;

  @VisibleForTesting
  public static final String ROUTER_APP_ROOT_HIERARCHIES = "HIERARCHIES";

  private volatile Clock clock = SystemClock.getInstance();

  protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1);

  @VisibleForTesting
  private ZKFederationStateStoreOpDurations opDurations =
      ZKFederationStateStoreOpDurations.getInstance();

  private Configuration configuration;

  /*
   * Indicates different app attempt state store operations.
   */
  private enum AppAttemptOp {
    STORE,
    UPDATE,
    REMOVE
  };

  /**
   * Encapsulates full app node path and corresponding split index.
   */
  private final static class AppNodeSplitInfo {
    private final String path;
    private final int splitIndex;
    AppNodeSplitInfo(String path, int splitIndex) {
      this.path = path;
      this.splitIndex = splitIndex;
    }
  }

  @Override
  public void init(Configuration conf) throws YarnException {

    LOG.info("Initializing ZooKeeper connection");
    this.configuration = conf;
    maxAppsInStateStore = conf.getInt(
       YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
       YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);

    baseZNode = conf.get(
        YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
        YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
    String zkHostPort = conf.get(YarnConfiguration.FEDERATION_STATESTORE_ZK_ADDRESS);
    try {
      this.zkManager = new ZKCuratorManager(conf);
      this.zkManager.start(zkHostPort);
    } catch (IOException e) {
      LOG.error("Cannot initialize the ZK connection", e);
    }

    // Base znodes
    membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
    appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
    policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
    reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
    versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION);

    // Initialize hierarchical path
    initHierarchiesPath();

    appIdNodeSplitIndex = conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
         YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
    if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > HIERARCHIES_LEVEL) {
      LOG.info("Invalid value {} for config {} specified. Resetting it to {}",
          appIdNodeSplitIndex, YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
          YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
      appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
    }

    // delegation token znodes
    routerRMDTSecretManagerRoot = getNodePath(baseZNode, ROUTER_RM_DT_SECRET_MANAGER_ROOT);
    routerRMDTMasterKeysRootPath = getNodePath(routerRMDTSecretManagerRoot,
        ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
    routerRMDelegationTokensRootPath = getNodePath(routerRMDTSecretManagerRoot,
        ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
    routerRMSequenceNumberPath = getNodePath(routerRMDTSecretManagerRoot,
        ROUTER_RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
    routerRMMasterKeyIdPath = getNodePath(routerRMDTSecretManagerRoot,
        ROUTER_RM_DT_MASTER_KEY_ID_ZNODE_NAME);

    // Create base znode for each entity
    createBaseZNodeForEachEntity();

    // Distributed sequenceNum.
    try {
      seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
          ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);

      delTokSeqCounter = new SharedCount(zkManager.getCurator(), routerRMSequenceNumberPath, 0);

      if (delTokSeqCounter != null) {
        delTokSeqCounter.start();
      }

      // the first batch range should be allocated during this starting window
      // by calling the incrSharedCount
      currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
      currentMaxSeqNum = currentSeqNum + seqNumBatchSize;

      LOG.info("Fetched initial range of seq num, from {} to {} ",
          currentSeqNum + 1, currentMaxSeqNum);
    } catch (Exception e) {
      throw new YarnException("Could not start Sequence Counter.", e);
    }

    // Distributed masterKeyId.
    try {
      keyIdSeqCounter = new SharedCount(zkManager.getCurator(), routerRMMasterKeyIdPath, 0);
      if (keyIdSeqCounter != null) {
        keyIdSeqCounter.start();
      }
    } catch (Exception e) {
      throw new YarnException("Could not start Master KeyId Counter", e);
    }
  }

  @Override
  public void close() throws Exception {

    try {
      if (delTokSeqCounter != null) {
        delTokSeqCounter.close();
        delTokSeqCounter = null;
      }
    } catch (Exception e) {
      LOG.error("Could not Stop Delegation Token Counter.", e);
    }

    try {
      if (keyIdSeqCounter != null) {
        keyIdSeqCounter.close();
        keyIdSeqCounter = null;
      }
    } catch (Exception e) {
      LOG.error("Could not stop Master KeyId Counter.", e);
    }

    if (zkManager != null) {
      zkManager.close();
    }
  }

  /**
   * Register the home {@code SubClusterId} of the newly submitted
   * {@code ApplicationId}. Currently response is empty if the operation was
   * successful, if not an exception reporting reason for a failure. If a
   * mapping for the application already existed, the {@code SubClusterId} in
   * this response will return the existing mapping which might be different
   * from that in the {@code AddApplicationHomeSubClusterRequest}.
   *
   * @param request the request to register a new application with its home sub-cluster.
   * @return upon successful registration of the application in the StateStore,
   *         {@code AddApplicationHomeSubClusterRequest} containing the home
   *         sub-cluster of the application. Otherwise, an exception reporting
   *         reason for a failure.
   * @throws YarnException indicates exceptions from yarn servers.
   */
  @Override
  public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
      AddApplicationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);

    // parse parameters
    // We need to get applicationId, subClusterId, appSubmissionContext 3 parameters.
    ApplicationHomeSubCluster requestApplicationHomeSubCluster =
        request.getApplicationHomeSubCluster();
    ApplicationId requestApplicationId = requestApplicationHomeSubCluster.getApplicationId();
    SubClusterId requestSubClusterId = requestApplicationHomeSubCluster.getHomeSubCluster();
    ApplicationSubmissionContext requestApplicationSubmissionContext =
         requestApplicationHomeSubCluster.getApplicationSubmissionContext();

    LOG.debug("applicationId = {}, subClusterId = {}, appSubmissionContext = {}.",
        requestApplicationId, requestSubClusterId, requestApplicationSubmissionContext);

    // Try to write the subcluster
    try {
      storeOrUpdateApplicationHomeSubCluster(requestApplicationId,
          requestApplicationHomeSubCluster, false);
    } catch (Exception e) {
      String errMsg = "Cannot add application home subcluster for " + requestApplicationId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    // Check for the actual subcluster
    try {
      // We try to get the ApplicationHomeSubCluster actually stored in ZK
      // according to the applicationId.
      ApplicationHomeSubCluster actualAppHomeSubCluster =
          getApplicationHomeSubCluster(requestApplicationId);
      SubClusterId responseSubClusterId = actualAppHomeSubCluster.getHomeSubCluster();
      long end = clock.getTime();
      opDurations.addAppHomeSubClusterDuration(start, end);
      return AddApplicationHomeSubClusterResponse.newInstance(responseSubClusterId);
    } catch (Exception e) {
      String errMsg = "Cannot check app home subcluster for " + requestApplicationId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    // Throw YarnException.
    throw new YarnException("Cannot addApplicationHomeSubCluster by request");
  }

  /**
   * Update the home {@code SubClusterId} of a previously submitted
   * {@code ApplicationId}. Currently response is empty if the operation was
   * successful, if not an exception reporting reason for a failure.
   *
   * @param request the request to update the home sub-cluster of an
   *          application.
   * @return empty on successful update of the application in the StateStore, if
   *         not an exception reporting reason for a failure
   * @throws YarnException indicates exceptions from yarn servers.
   */
  @Override
  public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
      UpdateApplicationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
    ApplicationHomeSubCluster requestApplicationHomeSubCluster =
        request.getApplicationHomeSubCluster();
    ApplicationId requestApplicationId = requestApplicationHomeSubCluster.getApplicationId();
    ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster =
        getApplicationHomeSubCluster(requestApplicationId);

    if (zkStoreApplicationHomeSubCluster == null) {
      String errMsg = "Application " + requestApplicationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    SubClusterId oldSubClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster();
    SubClusterId newSubClusterId = requestApplicationHomeSubCluster.getHomeSubCluster();
    ApplicationSubmissionContext requestApplicationSubmissionContext =
        requestApplicationHomeSubCluster.getApplicationSubmissionContext();

    LOG.debug("applicationId = {}, oldHomeSubClusterId = {}, newHomeSubClusterId = {}, " +
        "appSubmissionContext = {}.", requestApplicationId, oldSubClusterId, newSubClusterId,
        requestApplicationSubmissionContext);

    // update stored ApplicationHomeSubCluster
    storeOrUpdateApplicationHomeSubCluster(requestApplicationId,
        requestApplicationHomeSubCluster, true);

    long end = clock.getTime();
    opDurations.addUpdateAppHomeSubClusterDuration(start, end);
    return UpdateApplicationHomeSubClusterResponse.newInstance();
  }

  /**
   * Get information about the application identified by the input
   * {@code ApplicationId}.
   *
   * @param request contains the application queried
   * @return {@code ApplicationHomeSubCluster} containing the application's home
   *         subcluster
   * @throws YarnException indicates exceptions from yarn servers.
   */
  @Override
  public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
      GetApplicationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
    ApplicationId requestApplicationId = request.getApplicationId();

    ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster =
        getApplicationHomeSubCluster(requestApplicationId);
    if (zkStoreApplicationHomeSubCluster == null) {
      String errMsg = "Application " + requestApplicationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    // Prepare to return data
    SubClusterId subClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster();
    long createTime = zkStoreApplicationHomeSubCluster.getCreateTime();

    long end = clock.getTime();
    opDurations.addGetAppHomeSubClusterDuration(start, end);

    // If the request asks for an ApplicationSubmissionContext to be returned,
    // we will return
    if (request.getContainsAppSubmissionContext()) {
      ApplicationSubmissionContext submissionContext =
          zkStoreApplicationHomeSubCluster.getApplicationSubmissionContext();
      return GetApplicationHomeSubClusterResponse.newInstance(
          requestApplicationId, subClusterId, createTime, submissionContext);
    }

    return GetApplicationHomeSubClusterResponse.newInstance(requestApplicationId,
        subClusterId, createTime);
  }

  /**
   * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
   * all submitted applications to it's home sub-cluster.
   *
   * @param request empty representing all applications
   * @return the mapping of all submitted application to it's home sub-cluster
   * @throws YarnException indicates exceptions from yarn servers.
   */
  @Override
  public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
      GetApplicationsHomeSubClusterRequest request) throws YarnException {

    if (request == null) {
      throw new YarnException("Missing getApplicationsHomeSubCluster request");
    }

    try {
      long start = clock.getTime();
      SubClusterId requestSC = request.getSubClusterId();
      List<ApplicationHomeSubCluster> result = loadRouterApplications().stream()
          .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
          .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
          .limit(maxAppsInStateStore)
          .collect(Collectors.toList());
      long end = clock.getTime();
      opDurations.addGetAppsHomeSubClusterDuration(start, end);
      LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
      return GetApplicationsHomeSubClusterResponse.newInstance(result);
    } catch (Exception e) {
      String errMsg = "Cannot get apps: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    throw new YarnException("Cannot get app by request");
  }

  /**
   * Delete the mapping of home {@code SubClusterId} of a previously submitted
   * {@code ApplicationId}. Currently response is empty if the operation was
   * successful, if not an exception reporting reason for a failure.
   *
   * @param request the request to delete the home sub-cluster of an
   *          application.
   * @return empty on successful update of the application in the StateStore, if
   *         not an exception reporting reason for a failure
   * @throws YarnException if the request is invalid/fails
   */
  @Override
  public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
      DeleteApplicationHomeSubClusterRequest request) throws YarnException {
    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
    ApplicationId appId = request.getApplicationId();
    String appIdRemovePath = getLeafAppIdNodePath(appId.toString(), false);
    int splitIndex = appIdNodeSplitIndex;

    boolean exists = true;
    try {
      if (!exists(appIdRemovePath)) {
        AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
        if (alternatePathInfo != null) {
          appIdRemovePath = alternatePathInfo.path;
          splitIndex = alternatePathInfo.splitIndex;
        } else {
          exists = false;
        }
      }
    } catch (Exception e) {
      String errMsg = "Cannot check app: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    if (!exists) {
      String errMsg = "Application " + appId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    try {
      zkManager.delete(appIdRemovePath);
      // Check if we should remove the parent app node as well.
      checkRemoveParentAppNode(appIdRemovePath, splitIndex);
    } catch (Exception e) {
      String errMsg = "Cannot delete app: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addDeleteAppHomeSubClusterDuration(start, end);
    return DeleteApplicationHomeSubClusterResponse.newInstance();
  }

  @Override
  public SubClusterRegisterResponse registerSubCluster(
      SubClusterRegisterRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterInfo subClusterInfo = request.getSubClusterInfo();
    SubClusterId subclusterId = subClusterInfo.getSubClusterId();

    // Update the heartbeat time
    long currentTime = getCurrentTime();
    subClusterInfo.setLastHeartBeat(currentTime);

    try {
      putSubclusterInfo(subclusterId, subClusterInfo, true);
    } catch (Exception e) {
      String errMsg = "Cannot register subcluster: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addRegisterSubClusterDuration(start, end);
    return SubClusterRegisterResponse.newInstance();
  }

  @Override
  public SubClusterDeregisterResponse deregisterSubCluster(
      SubClusterDeregisterRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterId subClusterId = request.getSubClusterId();
    SubClusterState state = request.getState();

    // Get the current information and update it
    SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
    if (subClusterInfo == null) {
      String errMsg = "SubCluster " + subClusterId + " not found";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    } else {
      subClusterInfo.setState(state);
      putSubclusterInfo(subClusterId, subClusterInfo, true);
    }
    long end = clock.getTime();
    opDurations.addDeregisterSubClusterDuration(start, end);
    return SubClusterDeregisterResponse.newInstance();
  }

  @Override
  public SubClusterHeartbeatResponse subClusterHeartbeat(
      SubClusterHeartbeatRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterId subClusterId = request.getSubClusterId();

    SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
    if (subClusterInfo == null) {
      String errMsg = "SubCluster " + subClusterId
          + " does not exist; cannot heartbeat";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    long currentTime = getCurrentTime();
    subClusterInfo.setLastHeartBeat(currentTime);
    subClusterInfo.setState(request.getState());
    subClusterInfo.setCapability(request.getCapability());

    putSubclusterInfo(subClusterId, subClusterInfo, true);
    long end = clock.getTime();
    opDurations.addSubClusterHeartbeatDuration(start, end);
    return SubClusterHeartbeatResponse.newInstance();
  }

  @Override
  public GetSubClusterInfoResponse getSubCluster(
      GetSubClusterInfoRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterId subClusterId = request.getSubClusterId();
    SubClusterInfo subClusterInfo = null;
    try {
      subClusterInfo = getSubclusterInfo(subClusterId);
      if (subClusterInfo == null) {
        LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
        return null;
      }
    } catch (Exception e) {
      String errMsg = "Cannot get subcluster: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetSubClusterDuration(start, end);
    return GetSubClusterInfoResponse.newInstance(subClusterInfo);
  }

  @Override
  public GetSubClustersInfoResponse getSubClusters(
      GetSubClustersInfoRequest request) throws YarnException {
    long start = clock.getTime();
    List<SubClusterInfo> result = new ArrayList<>();

    try {
      for (String child : zkManager.getChildren(membershipZNode)) {
        SubClusterId subClusterId = SubClusterId.newInstance(child);
        SubClusterInfo info = getSubclusterInfo(subClusterId);
        if (!request.getFilterInactiveSubClusters() ||
            info.getState().isActive()) {
          result.add(info);
        }
      }
    } catch (Exception e) {
      String errMsg = "Cannot get subclusters: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetSubClustersDuration(start, end);
    return GetSubClustersInfoResponse.newInstance(result);
  }


  @Override
  public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
      GetSubClusterPolicyConfigurationRequest request) throws YarnException {
    long start = clock.getTime();
    FederationPolicyStoreInputValidator.validate(request);
    String queue = request.getQueue();
    SubClusterPolicyConfiguration policy = null;
    try {
      policy = getPolicy(queue);
    } catch (Exception e) {
      String errMsg = "Cannot get policy: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    if (policy == null) {
      LOG.warn("Policy for queue: {} does not exist.", queue);
      return null;
    }
    long end = clock.getTime();
    opDurations.addGetPolicyConfigurationDuration(start, end);
    return GetSubClusterPolicyConfigurationResponse
        .newInstance(policy);
  }

  @Override
  public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
      SetSubClusterPolicyConfigurationRequest request) throws YarnException {
    long start = clock.getTime();
    FederationPolicyStoreInputValidator.validate(request);
    SubClusterPolicyConfiguration policy =
        request.getPolicyConfiguration();
    try {
      String queue = policy.getQueue();
      putPolicy(queue, policy, true);
    } catch (Exception e) {
      String errMsg = "Cannot set policy: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addSetPolicyConfigurationDuration(start, end);
    return SetSubClusterPolicyConfigurationResponse.newInstance();
  }

  @Override
  public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
      GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
    long start = clock.getTime();
    List<SubClusterPolicyConfiguration> result = new ArrayList<>();

    try {
      for (String child : zkManager.getChildren(policiesZNode)) {
        SubClusterPolicyConfiguration policy = getPolicy(child);
        if (policy == null) {
          LOG.warn("Policy for queue: {} does not exist.", child);
          continue;
        }
        result.add(policy);
      }
    } catch (Exception e) {
      String errMsg = "Cannot get policies: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetPoliciesConfigurationsDuration(start, end);
    return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
  }

  @Override
  public DeleteSubClusterPoliciesConfigurationsResponse deletePoliciesConfigurations(
      DeleteSubClusterPoliciesConfigurationsRequest request) throws YarnException {
    FederationPolicyStoreInputValidator.validate(request);
    List<String> queues = request.getQueues();
    for (String queue : queues) {
      deletePolicyConfigurationByQueue(queue);
    }
    return DeleteSubClusterPoliciesConfigurationsResponse.newInstance();
  }

  private void deletePolicyConfigurationByQueue(String queue) {
    String policyZNode = getNodePath(policiesZNode, queue);

    boolean exists = false;
    try {
      exists = zkManager.exists(policyZNode);
    } catch (Exception e) {
      LOG.error("An error occurred when checking whether the queue = {} policy exists.", queue, e);
    }

    if (!exists) {
      LOG.error("The policy of the queue = {} does not exist.", queue);
      return;
    }

    try {
      zkManager.delete(policyZNode);
    } catch (Exception e) {
      LOG.error("Queue {} policy cannot be deleted.", queue, e);
    }
  }

  @Override
  public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations(
      DeletePoliciesConfigurationsRequest request) throws Exception {

    zkManager.delete(policiesZNode);

    try {
      List<ACL> zkAcl = ZKCuratorManager.getZKAcls(configuration);
      zkManager.createRootDirRecursively(policiesZNode, zkAcl);
    } catch (Exception e) {
      String errMsg = "Cannot create base directories: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    return DeletePoliciesConfigurationsResponse.newInstance();
  }

  @Override
  public Version getCurrentVersion() {
    return CURRENT_VERSION_INFO;
  }

  @Override
  public Version loadVersion() throws Exception {
    if (exists(versionNode)) {
      byte[] data = get(versionNode);
      if (data != null) {
        return new VersionPBImpl(VersionProto.parseFrom(data));
      }
    }
    return null;
  }

  @Override
  public void storeVersion() throws Exception {
    byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
    boolean isUpdate = exists(versionNode);
    put(versionNode, data, isUpdate);
  }

  private void initHierarchiesPath() {
    String hierarchiesPath = getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES);
    routerAppRootHierarchies = new HashMap<>();
    routerAppRootHierarchies.put(0, appsZNode);
    for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
      routerAppRootHierarchies.put(splitIndex,
          getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
    }
  }

  private void createBaseZNodeForEachEntity() throws YarnException {
    try {
      List<ACL> zkAcl = ZKCuratorManager.getZKAcls(configuration);
      zkManager.createRootDirRecursively(membershipZNode, zkAcl);
      zkManager.createRootDirRecursively(appsZNode, zkAcl);
      zkManager.createRootDirRecursively(
          getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
      for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
        zkManager.createRootDirRecursively(
            routerAppRootHierarchies.get(splitIndex));
      }
      zkManager.createRootDirRecursively(policiesZNode, zkAcl);
      zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
      zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
      zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
      zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl);
      zkManager.createRootDirRecursively(versionNode, zkAcl);
    } catch (Exception e) {
      String errMsg = "Cannot create base directories: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
  }

  @Override
  public void deleteStateStore() throws Exception {

    // Cleaning ZNodes and their child nodes;
    // after the cleaning is complete, the ZNodes will no longer exist.
    zkManager.delete(appsZNode);
    zkManager.delete(membershipZNode);
    zkManager.delete(policiesZNode);
    zkManager.delete(reservationsZNode);
    zkManager.delete(routerRMDTSecretManagerRoot);
    zkManager.delete(routerRMDTMasterKeysRootPath);
    zkManager.delete(routerRMDelegationTokensRootPath);
    zkManager.delete(versionNode);

    // Initialize hierarchical path
    initHierarchiesPath();

    // We will continue to create ZNodes to ensure that the base path exists.
    createBaseZNodeForEachEntity();
  }

  /**
   * Get the subcluster for an application.
   *
   * @param appId Application identifier.
   * @return ApplicationHomeSubCluster identifier.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private ApplicationHomeSubCluster getApplicationHomeSubCluster(
      final ApplicationId appId) throws YarnException{

    String appZNode = getLeafAppIdNodePath(appId.toString(), false);

    ApplicationHomeSubCluster appHomeSubCluster = null;
    byte[] data = get(appZNode);
    if (data != null) {
      try {
        appHomeSubCluster = new ApplicationHomeSubClusterPBImpl(
            ApplicationHomeSubClusterProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse application at " + appZNode;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return appHomeSubCluster;
  }

  /**
   * We will store the data of ApplicationHomeSubCluster according to appId.
   *
   * @param applicationId ApplicationId.
   * @param applicationHomeSubCluster ApplicationHomeSubCluster.
   * @param update false, add records; true, update records.
   * @throws Exception If it cannot contact ZooKeeper.
   */
  private void storeOrUpdateApplicationHomeSubCluster(final ApplicationId applicationId,
      final ApplicationHomeSubCluster applicationHomeSubCluster, boolean update)
      throws YarnException {
    try {
      ApplicationHomeSubClusterProto proto =
          ((ApplicationHomeSubClusterPBImpl) applicationHomeSubCluster).getProto();
      byte[] data = proto.toByteArray();
      if (update) {
        updateApplicationStateInternal(applicationId, data);
      } else {
        storeApplicationStateInternal(applicationId, data);
      }
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  protected void storeApplicationStateInternal(final ApplicationId applicationId, byte[] data)
      throws Exception {
    String nodeCreatePath = getLeafAppIdNodePath(applicationId.toString(), true);
    LOG.debug("Storing info for app: {} at: {}.", applicationId, nodeCreatePath);
    put(nodeCreatePath, data, false);
  }

  protected void updateApplicationStateInternal(final ApplicationId applicationId, byte[] data)
      throws Exception {
    String nodeUpdatePath = getLeafAppIdNodePath(applicationId.toString(), false);
    if (!exists(nodeUpdatePath)) {
      AppNodeSplitInfo alternatePathInfo = getAlternatePath(applicationId.toString());
      if (alternatePathInfo != null) {
        nodeUpdatePath = alternatePathInfo.path;
      } else if (appIdNodeSplitIndex != 0) {
        // No alternate path exists. Create path as per configured split index.
        String rootNode = getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
        if (!exists(rootNode)) {
          zkManager.create(rootNode);
        }
      }
    }
    LOG.debug("Storing final state info for app: {} at: {}.", applicationId, nodeUpdatePath);
    put(nodeUpdatePath, data, true);
  }

  /**
   * Get the current information for a subcluster from Zookeeper.
   * @param subclusterId Subcluster identifier.
   * @return Subcluster information or null if it doesn't exist.
   * @throws Exception If it cannot contact ZooKeeper.
   */
  private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId)
      throws YarnException {
    String memberZNode = getNodePath(membershipZNode, subclusterId.toString());

    SubClusterInfo policy = null;
    byte[] data = get(memberZNode);
    if (data != null) {
      try {
        policy = new SubClusterInfoPBImpl(
            SubClusterInfoProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse subcluster info at " + memberZNode;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return policy;
  }

  /**
   * Put the subcluster information in Zookeeper.
   * @param subclusterId Subcluster identifier.
   * @param subClusterInfo Subcluster information.
   * @throws Exception If it cannot contact ZooKeeper.
   */
  private void putSubclusterInfo(final SubClusterId subclusterId,
      final SubClusterInfo subClusterInfo, final boolean update)
          throws YarnException {
    String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
    SubClusterInfoProto proto =
        ((SubClusterInfoPBImpl)subClusterInfo).getProto();
    byte[] data = proto.toByteArray();
    put(memberZNode, data, update);
  }

  /**
   * Get the queue policy from Zookeeper.
   * @param queue Name of the queue.
   * @return Subcluster policy configuration.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private SubClusterPolicyConfiguration getPolicy(final String queue)
      throws YarnException {
    String policyZNode = getNodePath(policiesZNode, queue);

    SubClusterPolicyConfiguration policy = null;
    byte[] data = get(policyZNode);
    if (data != null) {
      try {
        policy = new SubClusterPolicyConfigurationPBImpl(
            SubClusterPolicyConfigurationProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse policy at " + policyZNode;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return policy;
  }

  /**
   * Put the subcluster information in Zookeeper.
   * @param queue Name of the queue.
   * @param policy Subcluster policy configuration.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private void putPolicy(final String queue,
      final SubClusterPolicyConfiguration policy, boolean update)
          throws YarnException {
    String policyZNode = getNodePath(policiesZNode, queue);

    SubClusterPolicyConfigurationProto proto =
        ((SubClusterPolicyConfigurationPBImpl)policy).getProto();
    byte[] data = proto.toByteArray();
    put(policyZNode, data, update);
  }

  /**
   * Get data from a znode in Zookeeper.
   * @param znode Path of the znode.
   * @return Data in the znode.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private byte[] get(String znode) throws YarnException {
    boolean exists = false;
    try {
      exists = zkManager.exists(znode);
    } catch (Exception e) {
      String errMsg = "Cannot find znode " + znode;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    if (!exists) {
      LOG.error("{} does not exist", znode);
      return null;
    }

    byte[] data = null;
    try {
      data = zkManager.getData(znode);
    } catch (Exception e) {
      String errMsg = "Cannot get data from znode " + znode
          + ": " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    return data;
  }

  /**
   * Put data into a znode in Zookeeper.
   * @param znode Path of the znode.
   * @param data Data to write.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private void put(String znode, byte[] data, boolean update)
      throws YarnException {
    // Create the znode
    boolean created = false;
    try {
      created = zkManager.create(znode);
    } catch (Exception e) {
      String errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    if (!created) {
      LOG.debug("{} not created", znode);
      if (!update) {
        LOG.info("{} already existed and we are not updating", znode);
        return;
      }
    }

    // Write the data into the znode
    try {
      zkManager.setData(znode, data, -1);
    } catch (Exception e) {
      String errMsg = "Cannot write data into znode " + znode
          + ": " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
  }

  /**
   * Get the current time.
   * @return Current time in milliseconds.
   */
  private static long getCurrentTime() {
    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
    return cal.getTimeInMillis();
  }

  private void putReservation(final ReservationId reservationId,
      final SubClusterId subClusterId, boolean update) throws YarnException {
    String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
    SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
    byte[] data = proto.toByteArray();
    put(reservationZNode, data, update);
  }

  private SubClusterId getReservation(final ReservationId reservationId)
      throws YarnException {
    String reservationIdZNode = getNodePath(reservationsZNode, reservationId.toString());
    SubClusterId subClusterId = null;
    byte[] data = get(reservationIdZNode);
    if (data != null) {
      try {
        subClusterId = new SubClusterIdPBImpl(SubClusterIdProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse reservation at " + reservationId;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return subClusterId;
  }

  @VisibleForTesting
  public ZKFederationStateStoreOpDurations getOpDurations() {
    return opDurations;
  }

  @Override
  public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
      AddReservationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
    ReservationId reservationId = reservationHomeSubCluster.getReservationId();

    // Try to write the subcluster
    SubClusterId homeSubCluster = reservationHomeSubCluster.getHomeSubCluster();
    try {
      putReservation(reservationId, homeSubCluster, false);
    } catch (Exception e) {
      String errMsg = "Cannot add reservation home subcluster for " + reservationId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    // Check for the actual subcluster
    try {
      homeSubCluster = getReservation(reservationId);
    } catch (Exception e) {
      String errMsg = "Cannot check app home subcluster for " + reservationId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addReservationHomeSubClusterDuration(start, end);
    return AddReservationHomeSubClusterResponse.newInstance(homeSubCluster);
  }

  @Override
  public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
      GetReservationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationId reservationId = request.getReservationId();
    SubClusterId homeSubCluster = getReservation(reservationId);

    if (homeSubCluster == null) {
      String errMsg = "Reservation " + reservationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    ReservationHomeSubCluster reservationHomeSubCluster =
        ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
    long end = clock.getTime();
    opDurations.addGetReservationHomeSubClusterDuration(start, end);
    return GetReservationHomeSubClusterResponse.newInstance(reservationHomeSubCluster);
  }

  @Override
  public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
      GetReservationsHomeSubClusterRequest request) throws YarnException {
    long start = clock.getTime();
    List<ReservationHomeSubCluster> result = new ArrayList<>();

    try {
      for (String child : zkManager.getChildren(reservationsZNode)) {
        ReservationId reservationId = ReservationId.parseReservationId(child);
        SubClusterId homeSubCluster = getReservation(reservationId);
        ReservationHomeSubCluster app =
            ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
        result.add(app);
      }
    } catch (Exception e) {
      String errMsg = "Cannot get apps: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetReservationsHomeSubClusterDuration(start, end);
    return GetReservationsHomeSubClusterResponse.newInstance(result);
  }

  @Override
  public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
      DeleteReservationHomeSubClusterRequest request) throws YarnException {
    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationId reservationId = request.getReservationId();
    String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());

    boolean exists = false;
    try {
      exists = zkManager.exists(reservationZNode);
    } catch (Exception e) {
      String errMsg = "Cannot check reservation: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    if (!exists) {
      String errMsg = "Reservation " + reservationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    try {
      zkManager.delete(reservationZNode);
    } catch (Exception e) {
      String errMsg = "Cannot delete reservation: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addDeleteReservationHomeSubClusterDuration(start, end);
    return DeleteReservationHomeSubClusterResponse.newInstance();
  }

  @Override
  public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
      UpdateReservationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
    SubClusterId homeSubCluster = getReservation(reservationId);

    if (homeSubCluster == null) {
      String errMsg = "Reservation " + reservationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    SubClusterId newSubClusterId = reservationHomeSubCluster.getHomeSubCluster();
    putReservation(reservationId, newSubClusterId, true);
    long end = clock.getTime();
    opDurations.addUpdateReservationHomeSubClusterDuration(start, end);
    return UpdateReservationHomeSubClusterResponse.newInstance();
  }

  /**
   * ZookeeperFederationStateStore Supports Store NewMasterKey.
   *
   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  @Override
  public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
      throws YarnException, IOException {

    long start = clock.getTime();
    // For the verification of the request, after passing the verification,
    // the request and the internal objects will not be empty and can be used directly.
    FederationRouterRMTokenInputValidator.validate(request);

    // Parse the delegationKey from the request and get the ZK storage path.
    DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
    String nodeCreatePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
    LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(),
        nodeCreatePath);

    // Write master key data to zk.
    try (ByteArrayOutputStream os = new ByteArrayOutputStream();
         DataOutputStream fsOut = new DataOutputStream(os)) {
      delegationKey.write(fsOut);
      put(nodeCreatePath, os.toByteArray(), false);
    }

    // Get the stored masterKey from zk.
    RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath);
    long end = clock.getTime();
    opDurations.addStoreNewMasterKeyDuration(start, end);
    return RouterMasterKeyResponse.newInstance(masterKeyFromZK);
  }

  /**
   * ZookeeperFederationStateStore Supports Remove MasterKey.
   *
   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  @Override
  public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
      throws YarnException, IOException {

    long start = clock.getTime();
    // For the verification of the request, after passing the verification,
    // the request and the internal objects will not be empty and can be used directly.
    FederationRouterRMTokenInputValidator.validate(request);

    try {
      // Parse the delegationKey from the request and get the ZK storage path.
      RouterMasterKey masterKey = request.getRouterMasterKey();
      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
      String nodeRemovePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
      LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(),
          nodeRemovePath);

      // Check if the path exists, Throws an exception if the path does not exist.
      if (!exists(nodeRemovePath)) {
        throw new YarnException("ZkNodePath = " + nodeRemovePath + " not exists!");
      }

      // try to remove masterKey.
      zkManager.delete(nodeRemovePath);
      long end = clock.getTime();
      opDurations.removeStoredMasterKeyDuration(start, end);
      return RouterMasterKeyResponse.newInstance(masterKey);
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  /**
   * ZookeeperFederationStateStore Supports Remove MasterKey.
   *
   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  @Override
  public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
      throws YarnException, IOException {

    long start = clock.getTime();
    // For the verification of the request, after passing the verification,
    // the request and the internal objects will not be empty and can be used directly.
    FederationRouterRMTokenInputValidator.validate(request);

    try {

      // Parse the delegationKey from the request and get the ZK storage path.
      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
      String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey);

      // Check if the path exists, Throws an exception if the path does not exist.
      if (!exists(nodePath)) {
        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
      }

      // Get the stored masterKey from zk.
      RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath);
      long end = clock.getTime();
      opDurations.getMasterKeyByDelegationKeyDuration(start, end);
      return RouterMasterKeyResponse.newInstance(routerMasterKey);
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  /**
   * Get MasterKeyZNodePath based on DelegationKey.
   *
   * @param delegationKey delegationKey.
   * @return masterKey ZNodePath.
   */
  private String getMasterKeyZNodePathByDelegationKey(DelegationKey delegationKey) {
    return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId());
  }

  /**
   * Get MasterKeyZNodePath based on KeyId.
   *
   * @param keyId master key id.
   * @return masterKey ZNodePath.
   */
  private String getMasterKeyZNodePathByKeyId(int keyId) {
    String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId;
    return getNodePath(routerRMDTMasterKeysRootPath, nodeName);
  }

  /**
   * Get RouterMasterKey from ZK.
   *
   * @param nodePath The path where masterKey is stored in zk.
   *
   * @return RouterMasterKey.
   * @throws IOException An IO Error occurred.
   */
  private RouterMasterKey getRouterMasterKeyFromZK(String nodePath)
      throws IOException {
    try {
      byte[] data = get(nodePath);
      if ((data == null) || (data.length == 0)) {
        return null;
      }

      ByteArrayInputStream bin = new ByteArrayInputStream(data);
      DataInputStream din = new DataInputStream(bin);
      DelegationKey key = new DelegationKey();
      key.readFields(din);

      return RouterMasterKey.newInstance(key.getKeyId(),
          ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
    } catch (Exception ex) {
      LOG.error("No node in path {}.", nodePath);
      throw new IOException(ex);
    }
  }

  /**
   * ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier.
   *
   * The stored token method is a synchronized method
   * used to ensure that storeNewToken is a thread-safe method.
   *
   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  @Override
  public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
      throws YarnException, IOException {

    long start = clock.getTime();
    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
    // and that the internal RouterStoreToken is not empty.
    FederationRouterRMTokenInputValidator.validate(request);

    try {

      // add delegationToken
      storeOrUpdateRouterRMDT(request, false);

      // Get the stored delegationToken from ZK and return.
      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request);
      long end = clock.getTime();
      opDurations.getStoreNewTokenDuration(start, end);
      return RouterRMTokenResponse.newInstance(resultStoreToken);
    } catch (YarnException | IOException e) {
      throw e;
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  /**
   * ZookeeperFederationStateStore Supports Update RMDelegationTokenIdentifier.
   *
   * The update stored token method is a synchronized method
   * used to ensure that storeNewToken is a thread-safe method.
   *
   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  @Override
  public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
      throws YarnException, IOException {

    long start = clock.getTime();
    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
    // and that the internal RouterStoreToken is not empty.
    FederationRouterRMTokenInputValidator.validate(request);

    try {

      // get the Token storage path
      String nodePath = getStoreTokenZNodePathByTokenRequest(request);

      // updateStoredToken needs to determine whether the zkNode exists.
      // If it exists, update the token data.
      // If it does not exist, write the new token data directly.
      boolean pathExists = true;
      if (!exists(nodePath)) {
        pathExists = false;
      }

      if (pathExists) {
        // update delegationToken
        storeOrUpdateRouterRMDT(request, true);
      } else {
        // add new delegationToken
        storeNewToken(request);
      }

      // Get the stored delegationToken from ZK and return.
      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request);
      long end = clock.getTime();
      opDurations.updateStoredTokenDuration(start, end);
      return RouterRMTokenResponse.newInstance(resultStoreToken);
    } catch (YarnException | IOException e) {
      throw e;
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  /**
   * ZookeeperFederationStateStore Supports Remove RMDelegationTokenIdentifier.
   *
   * The remove stored token method is a synchronized method
   * used to ensure that storeNewToken is a thread-safe method.
   *
   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
   * @throws YarnException if the call to the state store is unsuccessful.
   * @throws IOException An IO Error occurred.
   */
  @Override
  public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
      throws YarnException, IOException {

    long start = clock.getTime();
    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
    // and that the internal RouterStoreToken is not empty.
    FederationRouterRMTokenInputValidator.validate(request);

    try {

      // get the Token storage path
      String nodePath = getStoreTokenZNodePathByTokenRequest(request);

      // If the path to be deleted does not exist, throw an exception directly.
      if (!exists(nodePath)) {
        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
      }

      // Check again, first get the data from ZK,
      // if the data is not empty, then delete it
      RouterStoreToken storeToken = getStoreTokenFromZK(request);
      if (storeToken != null) {
        zkManager.delete(nodePath);
      }

      // return deleted token data.
      long end = clock.getTime();
      opDurations.removeStoredTokenDuration(start, end);
      return RouterRMTokenResponse.newInstance(storeToken);
    } catch (YarnException | IOException e) {
      throw e;
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  /**
   * The Router Supports GetTokenByRouterStoreToken.
   *
   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
   * @return RouterRMTokenResponse.
   * @throws YarnException if the call to the state store is unsuccessful
   * @throws IOException An IO Error occurred
   */
  @Override
  public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
      throws YarnException, IOException {

    long start = clock.getTime();
    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
    // and that the internal RouterStoreToken is not empty.
    FederationRouterRMTokenInputValidator.validate(request);

    try {

      // Before get the token,
      // we need to determine whether the path where the token is stored exists.
      // If it doesn't exist, we will throw an exception.
      String nodePath = getStoreTokenZNodePathByTokenRequest(request);
      if (!exists(nodePath)) {
        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
      }

      // Get the stored delegationToken from ZK and return.
      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request);
      // return deleted token data.
      long end = clock.getTime();
      opDurations.getTokenByRouterStoreTokenDuration(start, end);
      return RouterRMTokenResponse.newInstance(resultStoreToken);
    } catch (YarnException | IOException e) {
      throw e;
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  /**
   * Convert MasterKey to DelegationKey.
   *
   * Before using this function,
   * please use FederationRouterRMTokenInputValidator to verify the request.
   * By default, the request is not empty, and the internal object is not empty.
   *
   * @param request RouterMasterKeyRequest
   * @return DelegationKey.
   */
  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest request) {
    RouterMasterKey masterKey = request.getRouterMasterKey();
    return convertMasterKeyToDelegationKey(masterKey);
  }

  /**
   * Convert MasterKey to DelegationKey.
   *
   * @param masterKey masterKey.
   * @return DelegationKey.
   */
  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey masterKey) {
    ByteBuffer keyByteBuf = masterKey.getKeyBytes();
    byte[] keyBytes = new byte[keyByteBuf.remaining()];
    keyByteBuf.get(keyBytes);
    return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);
  }

  /**
   * Check if a path exists in zk.
   *
   * @param path Path to be checked.
   * @return Returns true if the path exists, false if the path does not exist.
   * @throws Exception When an exception to access zk occurs.
   */
  @VisibleForTesting
  boolean exists(final String path) throws Exception {
    return zkManager.exists(path);
  }

  /**
   * Add or update delegationToken.
   *
   * Before using this function,
   * please use FederationRouterRMTokenInputValidator to verify the request.
   * By default, the request is not empty, and the internal object is not empty.
   *
   * @param request storeToken
   * @param isUpdate true, update the token; false, create a new token.
   * @throws Exception exception occurs.
   */
  private void storeOrUpdateRouterRMDT(RouterRMTokenRequest request,  boolean isUpdate)
      throws Exception {

    RouterStoreToken routerStoreToken  = request.getRouterStoreToken();
    String nodeCreatePath = getStoreTokenZNodePathByTokenRequest(request);
    LOG.debug("nodeCreatePath = {}, isUpdate = {}", nodeCreatePath, isUpdate);
    put(nodeCreatePath, routerStoreToken.toByteArray(), isUpdate);
  }

  /**
   * Get ZNode Path of StoreToken.
   *
   * Before using this method, we should use FederationRouterRMTokenInputValidator
   * to verify the request,ensure that the request is not empty,
   * and ensure that the object in the request is not empty.
   *
   * @param request RouterMasterKeyRequest.
   * @return RouterRMToken ZNode Path.
   * @throws IOException io exception occurs.
   */
  private String getStoreTokenZNodePathByTokenRequest(RouterRMTokenRequest request)
      throws IOException {
    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
    YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
    return getStoreTokenZNodePathByIdentifier(identifier);
  }

  /**
   * Get ZNode Path of StoreToken.
   *
   * @param identifier YARNDelegationTokenIdentifier
   * @return RouterRMToken ZNode Path.
   */
  private String getStoreTokenZNodePathByIdentifier(YARNDelegationTokenIdentifier identifier) {
    String nodePath = getNodePath(routerRMDelegationTokensRootPath,
        ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
    return nodePath;
  }

  /**
   * Get RouterStoreToken from ZK.
   *
   * @param request RouterMasterKeyRequest.
   * @return RouterStoreToken.
   * @throws IOException io exception occurs.
   */
  private RouterStoreToken getStoreTokenFromZK(RouterRMTokenRequest request) throws IOException {
    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
    YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
    return getStoreTokenFromZK(identifier);
  }

  /**
   * Get RouterStoreToken from ZK.
   *
   * @param identifier YARN DelegationToken Identifier.
   * @return RouterStoreToken.
   * @throws IOException io exception occurs.
   */
  private RouterStoreToken getStoreTokenFromZK(YARNDelegationTokenIdentifier identifier)
      throws IOException {
    // get the Token storage path
    String nodePath = getStoreTokenZNodePathByIdentifier(identifier);
    return getStoreTokenFromZK(nodePath);
  }

  /**
   * Get RouterStoreToken from ZK.
   *
   * @param nodePath Znode location where data is stored.
   * @return RouterStoreToken.
   * @throws IOException io exception occurs.
   */
  private RouterStoreToken getStoreTokenFromZK(String nodePath)
      throws IOException {
    try {
      byte[] data = get(nodePath);
      if ((data == null) || (data.length == 0)) {
        return null;
      }
      ByteArrayInputStream bin = new ByteArrayInputStream(data);
      DataInputStream din = new DataInputStream(bin);
      RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class);
      storeToken.readFields(din);
      return storeToken;
    } catch (Exception ex) {
      LOG.error("No node in path [{}]", nodePath, ex);
      throw new IOException(ex);
    }
  }

  /**
   * Increase SequenceNum. For zk, this is a distributed value.
   * To ensure data consistency, we will use the synchronized keyword.
   *
   * For ZookeeperFederationStateStore, in order to reduce the interaction with ZK,
   * we will apply for SequenceNum from ZK in batches(Apply
   * when currentSeqNum &gt;= currentMaxSeqNum),
   * and assign this value to the variable currentMaxSeqNum.
   *
   * When calling the method incrementDelegationTokenSeqNum,
   * if currentSeqNum &lt; currentMaxSeqNum, we return ++currentMaxSeqNum,
   * When currentSeqNum &gt;= currentMaxSeqNum, we re-apply SequenceNum from zk.
   *
   * @return SequenceNum.
   */
  @Override
  public int incrementDelegationTokenSeqNum() {
    // The secret manager will keep a local range of seq num which won't be
    // seen by peers, so only when the range is exhausted it will ask zk for
    // another range again
    if (currentSeqNum >= currentMaxSeqNum) {
      try {
        // after a successful batch request, we can get the range starting point
        currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
        currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
        LOG.info("Fetched new range of seq num, from {} to {} ",
            currentSeqNum + 1, currentMaxSeqNum);
      } catch (InterruptedException e) {
        // The ExpirationThread is just finishing.. so don't do anything..
        LOG.debug("Thread interrupted while performing token counter increment", e);
        Thread.currentThread().interrupt();
      } catch (Exception e) {
        throw new RuntimeException("Could not increment shared counter !!", e);
      }
    }
    return ++currentSeqNum;
  }

  /**
   * Increment the value of the shared variable.
   *
   * @param sharedCount zk SharedCount.
   * @param batchSize batch size.
   * @return new SequenceNum.
   * @throws Exception exception occurs.
   */
  private int incrSharedCount(SharedCount sharedCount, int batchSize)
      throws Exception {
    while (true) {
      // Loop until we successfully increment the counter
      VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue();
      if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + batchSize)) {
        return versionedValue.getValue();
      }
    }
  }

  /**
   * Get DelegationToken SeqNum.
   *
   * @return delegationTokenSeqNum.
   */
  @Override
  public int getDelegationTokenSeqNum() {
    return delTokSeqCounter.getCount();
  }

  /**
   * Set DelegationToken SeqNum.
   *
   * @param seqNum sequenceNum.
   */
  @Override
  public void setDelegationTokenSeqNum(int seqNum) {
    try {
      delTokSeqCounter.setCount(seqNum);
    } catch (Exception e) {
      throw new RuntimeException("Could not set shared counter !!", e);
    }
  }

  /**
   * Get Current KeyId.
   *
   * @return currentKeyId.
   */
  @Override
  public int getCurrentKeyId() {
    return keyIdSeqCounter.getCount();
  }

  /**
   * The Router Supports incrementCurrentKeyId.
   *
   * @return CurrentKeyId.
   */
  @Override
  public int incrementCurrentKeyId() {
    try {
      // It should be noted that the BatchSize of MasterKeyId defaults to 1.
      incrSharedCount(keyIdSeqCounter, 1);
    } catch (InterruptedException e) {
      // The ExpirationThread is just finishing.. so don't do anything..
      LOG.debug("Thread interrupted while performing Master keyId increment", e);
      Thread.currentThread().interrupt();
    } catch (Exception e) {
      throw new RuntimeException("Could not increment shared Master keyId counter !!", e);
    }
    return keyIdSeqCounter.getCount();
  }

  /**
   * Get parent app node path based on full path and split index supplied.
   * @param appIdPath App id path for which parent needs to be returned.
   * @param splitIndex split index.
   * @return parent app node path.
   */
  private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
    // Calculated as string upto index (appIdPath Length - split index - 1). We
    // deduct 1 to exclude path separator.
    return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
  }

  /**
   * Checks if parent app node has no leaf nodes and if it does not have,
   * removes it. Called while removing application.
   *
   * @param appIdPath path of app id to be removed.
   * @param splitIndex split index.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
      throws Exception {
    if (splitIndex == 0) {
      return;
    }

    String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
    List<String> children;
    try {
      children = getChildren(parentAppNode);
    } catch (KeeperException.NoNodeException ke) {
      // It should be fine to swallow this exception as the parent app node we
      // intend to delete is already deleted.
      LOG.debug("Unable to remove app parent node {} as it does not exist.",
          parentAppNode);
      return;
    }

    // If children==null or children is not empty, we cannot delete the parent path.
    if (children == null || !children.isEmpty()) {
      return;
    }

    // No apps stored under parent path.
    try {
      zkManager.delete(parentAppNode);
      LOG.debug("No leaf app node exists. Removing parent node {}.", parentAppNode);
    } catch (KeeperException.NotEmptyException ke) {
      // It should be fine to swallow this exception as the parent app node
      // has to be deleted only if it has no children. And this node has.
      LOG.debug("Unable to remove app parent node {} as it has children.",
          parentAppNode);
    }
  }

  List<String> getChildren(final String path) throws Exception {
    return zkManager.getChildren(path);
  }

  /**
   * Get alternate path for app id if path according to configured split index
   * does not exist. We look for path based on all possible split indices.
   * @param appId
   * @return a {@link AppNodeSplitInfo} object containing the path and split
   *    index if it exists, null otherwise.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
    for (Map.Entry<Integer, String> entry : routerAppRootHierarchies.entrySet()) {
      // Look for other paths
      int splitIndex = entry.getKey();
      if (splitIndex != appIdNodeSplitIndex) {
        String alternatePath =
            getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
        if (exists(alternatePath)) {
          return new AppNodeSplitInfo(alternatePath, splitIndex);
        }
      }
    }
    return null;
  }

  /**
   * Returns leaf app node path based on app id and passed split index. If the
   * passed flag createParentIfNotExists is true, also creates the parent app
   * node if it does not exist.
   * @param appId application id.
   * @param rootNode app root node based on split index.
   * @param appIdNodeSplitIdx split index.
   * @param createParentIfNotExists flag which determines if parent app node
   *     needs to be created(as per split) if it does not exist.
   * @return leaf app node path.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private String getLeafAppIdNodePath(String appId, String rootNode,
       int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
    if (appIdNodeSplitIdx == 0) {
      return getNodePath(rootNode, appId);
    }
    String nodeName = appId;
    int splitIdx = nodeName.length() - appIdNodeSplitIdx;
    String rootNodePath = getNodePath(rootNode, nodeName.substring(0, splitIdx));
    if (createParentIfNotExists && !exists(rootNodePath)) {
      try {
        zkManager.create(rootNodePath);
      } catch (KeeperException.NodeExistsException e) {
        LOG.debug("Unable to create app parent node {} as it already exists.", rootNodePath);
      }
    }
    return getNodePath(rootNodePath, nodeName.substring(splitIdx));
  }

  /**
   * Returns leaf app node path based on app id and configured split index. If
   * the passed flag createParentIfNotExists is true, also creates the parent
   * app node if it does not exist.
   * @param appId application id.
   * @param createParentIfNotExists flag which determines if parent app node
   *     needs to be created(as per split) if it does not exist.
   * @return leaf app node path.
   * @throws YarnException if any problem occurs while performing ZK operation.
   */
  private String getLeafAppIdNodePath(String appId,
      boolean createParentIfNotExists) throws YarnException {
    try {
      String rootNode = routerAppRootHierarchies.get(appIdNodeSplitIndex);
      return getLeafAppIdNodePath(appId, rootNode, appIdNodeSplitIndex, createParentIfNotExists);
    } catch (Exception e) {
      throw new YarnException(e);
    }
  }

  private ApplicationHomeSubCluster loadRouterAppStateFromAppNode(String appNodePath)
      throws Exception {
    byte[] data = get(appNodePath);
    LOG.debug("Loading application from znode: {}", appNodePath);
    ApplicationHomeSubCluster appHomeSubCluster = null;

    if (data == null) {
      return appHomeSubCluster;
    }

    try {
      appHomeSubCluster = new ApplicationHomeSubClusterPBImpl(
          ApplicationHomeSubClusterProto.parseFrom(data));
    } catch (InvalidProtocolBufferException e) {
      String errMsg = "Cannot parse application at " + appNodePath;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    return appHomeSubCluster;
  }

  private List<ApplicationHomeSubCluster> loadRouterApplications() throws Exception {
    List<ApplicationHomeSubCluster> applicationHomeSubClusters = new ArrayList<>();
    for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
      String appRoot = routerAppRootHierarchies.get(splitIndex);
      if (appRoot == null) {
        continue;
      }
      List<String> childNodes = getChildren(appRoot);
      boolean appNodeFound = false;
      for (String childNodeName : childNodes) {
        if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
          appNodeFound = true;
          if (splitIndex == 0) {
            ApplicationHomeSubCluster applicationHomeSubCluster =
                    loadRouterAppStateFromAppNode(getNodePath(appRoot, childNodeName));
            applicationHomeSubClusters.add(applicationHomeSubCluster);
          } else {
            // If AppId Node is partitioned.
            String parentNodePath = getNodePath(appRoot, childNodeName);
            List<String> leafNodes = getChildren(parentNodePath);
            for (String leafNodeName : leafNodes) {
              ApplicationHomeSubCluster applicationHomeSubCluster =
                      loadRouterAppStateFromAppNode(getNodePath(parentNodePath, leafNodeName));
              applicationHomeSubClusters.add(applicationHomeSubCluster);
            }
          }
        } else if (!childNodeName.equals(ROUTER_APP_ROOT_HIERARCHIES)){
          LOG.debug("Unknown child node with name {} under {}.", childNodeName, appRoot);
        }
      }
      if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
        // If no loaded app exists for a particular split index and the split
        // index for which apps are being loaded is not the one configured, then
        // we do not need to keep track of this hierarchy for storing/updating/
        // removing app/app attempt znodes.
        routerAppRootHierarchies.remove(splitIndex);
      }
    }
    return applicationHomeSubClusters;
  }

  @VisibleForTesting
  public void resetOpDurations() {
    opDurations = opDurations.resetOpDurations();
  }
}