ZKRMStateStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.util.curator.ZKCuratorManager.SafeTransaction;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * {@link RMStateStore} implementation backed by ZooKeeper.
 *
 * The znode structure is as follows:
 * ROOT_DIR_PATH
 * |--- VERSION_INFO
 * |--- EPOCH_NODE
 * |--- RM_ZK_FENCING_LOCK
 * |--- RM_APP_ROOT
 * |     |----- HIERARCHIES
 * |     |        |----- 1
 * |     |        |      |----- (#ApplicationId barring last character)
 * |     |        |      |       |----- (#Last character of ApplicationId)
 * |     |        |      |       |       |----- (#ApplicationAttemptIds)
 * |     |        |      ....
 * |     |        |
 * |     |        |----- 2
 * |     |        |      |----- (#ApplicationId barring last 2 characters)
 * |     |        |      |       |----- (#Last 2 characters of ApplicationId)
 * |     |        |      |       |       |----- (#ApplicationAttemptIds)
 * |     |        |      ....
 * |     |        |
 * |     |        |----- 3
 * |     |        |      |----- (#ApplicationId barring last 3 characters)
 * |     |        |      |       |----- (#Last 3 characters of ApplicationId)
 * |     |        |      |       |       |----- (#ApplicationAttemptIds)
 * |     |        |      ....
 * |     |        |
 * |     |        |----- 4
 * |     |        |      |----- (#ApplicationId barring last 4 characters)
 * |     |        |      |       |----- (#Last 4 characters of ApplicationId)
 * |     |        |      |       |       |----- (#ApplicationAttemptIds)
 * |     |        |      ....
 * |     |        |
 * |     |----- (#ApplicationId1)
 * |     |        |----- (#ApplicationAttemptIds)
 * |     |
 * |     |----- (#ApplicationId2)
 * |     |       |----- (#ApplicationAttemptIds)
 * |     ....
 * |
 * |--- RM_DT_SECRET_MANAGER_ROOT
 *        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
 *        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
 *        |       |----- 1
 *        |       |      |----- (#TokenId barring last character)
 *        |       |      |       |----- (#Last character of TokenId)
 *        |       |      ....
 *        |       |----- 2
 *        |       |      |----- (#TokenId barring last 2 characters)
 *        |       |      |       |----- (#Last 2 characters of TokenId)
 *        |       |      ....
 *        |       |----- 3
 *        |       |      |----- (#TokenId barring last 3 characters)
 *        |       |      |       |----- (#Last 3 characters of TokenId)
 *        |       |      ....
 *        |       |----- 4
 *        |       |      |----- (#TokenId barring last 4 characters)
 *        |       |      |       |----- (#Last 4 characters of TokenId)
 *        |       |      ....
 *        |       |----- Token_1
 *        |       |----- Token_2
 *        |       ....
 *        |
 *        |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
 *        |      |----- Key_1
 *        |      |----- Key_2
 *                ....
 * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
 *        |----- currentMasterKey
 *        |----- nextMasterKey
 *
 * |-- RESERVATION_SYSTEM_ROOT
 *        |------PLAN_1
 *        |      |------ RESERVATION_1
 *        |      |------ RESERVATION_2
 *        |      ....
 *        |------PLAN_2
 *        ....
 * |-- PROXY_CA_ROOT
 *        |----- caCert
 *        |----- caPrivateKey
 *
 * Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved
 * separately. The currentMasterkey and nextMasterkey have been stored.
 * Also, AMRMToken has been removed from ApplicationAttemptState.
 *
 * Changes from 1.2 to 1.3, Addition of ReservationSystem state.
 *
 * Changes from 1.3 to 1.4 - Change the structure of application znode by
 * splitting it in 2 parts, depending on a configurable split index. This limits
 * the number of application znodes returned in a single call while loading
 * app state.
 *
 * Changes from 1.4 to 1.5 - Change the structure of delegation token znode by
 * splitting it in 2 parts, depending on a configurable split index. This limits
 * the number of delegation token znodes returned in a single call while loading
 * tokens state.
 */
@Private
@Unstable
public class ZKRMStateStore extends RMStateStore {
  private static final Logger LOG =
      LoggerFactory.getLogger(ZKRMStateStore.class);

  private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
      "RMDelegationTokensRoot";
  private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
      "RMDTSequentialNumber";
  private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
      "RMDTMasterKeysRoot";
  @VisibleForTesting
  public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
  protected static final Version CURRENT_VERSION_INFO = Version
      .newInstance(1, 5);
  @VisibleForTesting
  public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";

  /* Znode paths */
  private String zkRootNodePath;
  private String rmAppRoot;
  private Map<Integer, String> rmAppRootHierarchies;
  private Map<Integer, String> rmDelegationTokenHierarchies;
  private String rmDTSecretManagerRoot;
  private String dtMasterKeysRootPath;
  private String delegationTokensRootPath;
  private String dtSequenceNumberPath;
  private String amrmTokenSecretManagerRoot;
  private String reservationRoot;
  private String proxyCARoot;

  @VisibleForTesting
  protected String znodeWorkingPath;
  private int appIdNodeSplitIndex = 0;
  @VisibleForTesting
  protected int delegationTokenNodeSplitIndex = 0;

  /* Fencing related variables */
  private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
  private String fencingNodePath;
  private Thread verifyActiveStatusThread;
  private int zkSessionTimeout;
  private int zknodeLimit;

  /* ACL and auth info */
  private List<ACL> zkAcl;
  @VisibleForTesting
  List<ACL> zkRootNodeAcl;
  private String zkRootNodeUsername;

  private static final int CREATE_DELETE_PERMS =
      ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
  private final String zkRootNodeAuthScheme =
      new DigestAuthenticationProvider().getScheme();

  /** Manager for the ZooKeeper connection. */
  private ZKCuratorManager zkManager;

  private volatile Clock clock = SystemClock.getInstance();
  @VisibleForTesting
  protected ZKRMStateStoreOpDurations opDurations;

  /*
   * Indicates different app attempt state store operations.
   */
  private enum AppAttemptOp {
    STORE,
    UPDATE,
    REMOVE
  };

  /**
   * Encapsulates znode path and corresponding split index for hierarchical
   * znode layouts.
   */
  private final static class ZnodeSplitInfo {
    private final String path;
    private final int splitIndex;
    ZnodeSplitInfo(String path, int splitIndex) {
      this.path = path;
      this.splitIndex = splitIndex;
    }
  }

  /**
   * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
   * ZooKeeper access, construct the {@link ACL}s for the store's root node.
   * In the constructed {@link ACL}, all the users allowed by sourceACLs are
   * given read-write-admin access, while the current RM has exclusive
   * create-delete access.
   *
   * To be called only when HA is enabled and the configuration doesn't set an
   * ACL for the root node.
   * @param conf the configuration
   * @param sourceACLs the source ACLs
   * @return ACLs for the store's root node
   * @throws java.security.NoSuchAlgorithmException thrown if the digest
   * algorithm used by Zookeeper cannot be found
   */
  @VisibleForTesting
  @Private
  @Unstable
  protected List<ACL> constructZkRootNodeACL(Configuration conf,
      List<ACL> sourceACLs) throws NoSuchAlgorithmException {
    List<ACL> zkRootNodeAclList = new ArrayList<>();

    for (ACL acl : sourceACLs) {
      zkRootNodeAclList.add(new ACL(
          ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
          acl.getId()));
    }

    zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
        YarnConfiguration.RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
    Id rmId = new Id(zkRootNodeAuthScheme,
        DigestAuthenticationProvider.generateDigest(zkRootNodeUsername + ":"
            + resourceManager.getZkRootNodePassword()));
    zkRootNodeAclList.add(new ACL(CREATE_DELETE_PERMS, rmId));

    return zkRootNodeAclList;
  }

  @Override
  public synchronized void initInternal(Configuration conf)
      throws IOException, NoSuchAlgorithmException {
    /* Initialize fencing related paths, acls, and ops */
    znodeWorkingPath =
        conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
            YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
    zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
    rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
    String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
    rmAppRootHierarchies = new HashMap<>(5);
    rmAppRootHierarchies.put(0, rmAppRoot);
    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
      rmAppRootHierarchies.put(splitIndex,
          getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
    }

    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
    zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
    zknodeLimit = conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
        YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);

    appIdNodeSplitIndex =
        conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
            YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
    if (appIdNodeSplitIndex < 0 || appIdNodeSplitIndex > 4) {
      LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
          YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
              "Resetting it to " +
                  YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
      appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
    }

    opDurations = ZKRMStateStoreOpDurations.getInstance();

    zkAcl = ZKCuratorManager.getZKAcls(conf);

    if (HAUtil.isHAEnabled(conf)) {
      String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
          (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
      if (zkRootNodeAclConf != null) {
        zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);

        try {
          zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf);
        } catch (ZKUtil.BadAclFormatException bafe) {
          LOG.error("Invalid format for "
              + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
          throw bafe;
        }
      } else {
        zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
      }
    }

    rmDTSecretManagerRoot =
        getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
    dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
        RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
    delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
        RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
    rmDelegationTokenHierarchies = new HashMap<>(5);
    rmDelegationTokenHierarchies.put(0, delegationTokensRootPath);
    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
      rmDelegationTokenHierarchies.put(splitIndex,
          getNodePath(delegationTokensRootPath, Integer.toString(splitIndex)));
    }
    dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
        RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
    amrmTokenSecretManagerRoot =
        getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
    proxyCARoot = getNodePath(zkRootNodePath, PROXY_CA_ROOT);
    reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
    zkManager = resourceManager.getZKManager();
    if(zkManager==null) {
      zkManager = resourceManager.createAndStartZKManager(conf);
    }
    delegationTokenNodeSplitIndex =
        conf.getInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
            YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
    if (delegationTokenNodeSplitIndex < 0
        || delegationTokenNodeSplitIndex > 4) {
      LOG.info("Invalid value " + delegationTokenNodeSplitIndex + " for config "
          + YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX
          + " specified.  Resetting it to " +
          YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
      delegationTokenNodeSplitIndex =
          YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX;
    }
  }

  @Override
  public synchronized void startInternal() throws Exception {
    // ensure root dirs exist
    zkManager.createRootDirRecursively(znodeWorkingPath, zkAcl);
    create(zkRootNodePath);
    setRootNodeAcls();
    delete(fencingNodePath);
    if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
        .isAutomaticFailoverEnabled(getConfig())) {
      verifyActiveStatusThread = new VerifyActiveStatusThread();
      verifyActiveStatusThread.start();
    }
    create(rmAppRoot);
    create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
      create(rmAppRootHierarchies.get(splitIndex));
    }
    create(rmDTSecretManagerRoot);
    create(dtMasterKeysRootPath);
    create(delegationTokensRootPath);
    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
      create(rmDelegationTokenHierarchies.get(splitIndex));
    }
    create(dtSequenceNumberPath);
    create(amrmTokenSecretManagerRoot);
    create(reservationRoot);
    create(proxyCARoot);
  }

  private void logRootNodeAcls(String prefix) throws Exception {
    Stat getStat = new Stat();
    List<ACL> getAcls = getACL(zkRootNodePath);

    StringBuilder builder = new StringBuilder();
    builder.append(prefix);

    for (ACL acl : getAcls) {
      builder.append(acl.toString());
    }

    builder.append(getStat.toString());
    LOG.debug("{}", builder);
  }

  private void setRootNodeAcls() throws Exception {
    if (LOG.isDebugEnabled()) {
      logRootNodeAcls("Before setting ACLs'\n");
    }

    CuratorFramework curatorFramework = zkManager.getCurator();
    if (HAUtil.isHAEnabled(getConfig())) {
      curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
    } else {
      curatorFramework.setACL().withACL(zkAcl).forPath(zkRootNodePath);
    }

    if (LOG.isDebugEnabled()) {
      logRootNodeAcls("After setting ACLs'\n");
    }
  }

  @Override
  protected synchronized void closeInternal() throws Exception {
    if (verifyActiveStatusThread != null) {
      verifyActiveStatusThread.interrupt();
      verifyActiveStatusThread.join(1000);
    }

    if (resourceManager.getZKManager() == null) {
      CuratorFramework curatorFramework = zkManager.getCurator();
      IOUtils.closeStream(curatorFramework);
    }
  }

  @Override
  protected Version getCurrentVersion() {
    return CURRENT_VERSION_INFO;
  }

  @Override
  protected synchronized void storeVersion() throws Exception {
    String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
    byte[] data =
        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();

    if (exists(versionNodePath)) {
      zkManager.safeSetData(versionNodePath, data, -1, zkAcl, fencingNodePath);
    } else {
      zkManager.safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT,
          zkAcl, fencingNodePath);
    }
  }

  @Override
  protected synchronized Version loadVersion() throws Exception {
    String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);

    if (exists(versionNodePath)) {
      byte[] data = getData(versionNodePath);
      return new VersionPBImpl(VersionProto.parseFrom(data));
    }

    return null;
  }

  @Override
  public synchronized long getAndIncrementEpoch() throws Exception {
    String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
    long currentEpoch = baseEpoch;

    if (exists(epochNodePath)) {
      // load current epoch
      byte[] data = getData(epochNodePath);
      Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
      currentEpoch = epoch.getEpoch();
      // increment epoch and store it
      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
          .toByteArray();
      zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
          fencingNodePath);
    } else {
      // initialize epoch node with 1 for the next time.
      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
          .toByteArray();
      zkManager.safeCreate(epochNodePath, storeData, zkAcl,
          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
    }

    return currentEpoch;
  }

  @Override
  public synchronized RMState loadState() throws Exception {
    long start = clock.getTime();
    RMState rmState = new RMState();
    // recover DelegationTokenSecretManager
    loadRMDTSecretManagerState(rmState);
    // recover RM applications
    loadRMAppState(rmState);
    // recover AMRMTokenSecretManager
    loadAMRMTokenSecretManagerState(rmState);
    // recover reservation state
    loadReservationSystemState(rmState);
    // recover ProxyCAManager state
    loadProxyCAManagerState(rmState);
    opDurations.addLoadStateCallDuration(clock.getTime() - start);
    return rmState;
  }

  private void loadReservationSystemState(RMState rmState) throws Exception {
    List<String> planNodes = getChildren(reservationRoot);

    for (String planName : planNodes) {
      LOG.debug("Loading plan from znode: {}", planName);

      String planNodePath = getNodePath(reservationRoot, planName);
      List<String> reservationNodes = getChildren(planNodePath);

      for (String reservationNodeName : reservationNodes) {
        String reservationNodePath =
            getNodePath(planNodePath, reservationNodeName);

        LOG.debug("Loading reservation from znode: {}", reservationNodePath);

        byte[] reservationData = getData(reservationNodePath);
        ReservationAllocationStateProto allocationState =
            ReservationAllocationStateProto.parseFrom(reservationData);

        if (!rmState.getReservationState().containsKey(planName)) {
          rmState.getReservationState().put(planName, new HashMap<>());
        }

        ReservationId reservationId =
            ReservationId.parseReservationId(reservationNodeName);
        rmState.getReservationState().get(planName).put(reservationId,
            allocationState);
      }
    }
  }

  private void loadAMRMTokenSecretManagerState(RMState rmState)
      throws Exception {
    byte[] data = getData(amrmTokenSecretManagerRoot);

    if (data == null) {
      LOG.warn("There is no data saved");
    } else {
      AMRMTokenSecretManagerStatePBImpl stateData =
          new AMRMTokenSecretManagerStatePBImpl(
            AMRMTokenSecretManagerStateProto.parseFrom(data));
      rmState.amrmTokenSecretManagerState =
          AMRMTokenSecretManagerState.newInstance(
            stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
    }
  }

  private synchronized void loadRMDTSecretManagerState(RMState rmState)
      throws Exception {
    loadRMDelegationKeyState(rmState);
    loadRMSequentialNumberState(rmState);
    loadRMDelegationTokenState(rmState);
  }

  private void loadRMDelegationKeyState(RMState rmState) throws Exception {
    List<String> childNodes = getChildren(dtMasterKeysRootPath);

    for (String childNodeName : childNodes) {
      String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
      byte[] childData = getData(childNodePath);

      if (childData == null) {
        LOG.warn("Content of " + childNodePath + " is broken.");
        continue;
      }

      ByteArrayInputStream is = new ByteArrayInputStream(childData);

      try (DataInputStream fsIn = new DataInputStream(is)) {
        if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
          DelegationKey key = new DelegationKey();
          key.readFields(fsIn);
          rmState.rmSecretManagerState.masterKeyState.add(key);

          LOG.debug("Loaded delegation key: keyId={}, expirationDate={}",
              key.getKeyId(), key.getExpiryDate());

        }
      }
    }
  }

  private void loadRMSequentialNumberState(RMState rmState) throws Exception {
    byte[] seqData = getData(dtSequenceNumberPath);

    if (seqData != null) {
      ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);

      try (DataInputStream seqIn = new DataInputStream(seqIs)) {
        rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
      }
    }
  }

  private void loadRMDelegationTokenState(RMState rmState) throws Exception {
    for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
      String tokenRoot = rmDelegationTokenHierarchies.get(splitIndex);
      if (tokenRoot == null) {
        continue;
      }
      List<String> childNodes = getChildren(tokenRoot);
      boolean dtNodeFound = false;
      for (String childNodeName : childNodes) {
        if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
          dtNodeFound = true;
          String parentNodePath = getNodePath(tokenRoot, childNodeName);
          if (splitIndex == 0) {
            loadDelegationTokenFromNode(rmState, parentNodePath);
          } else {
            // If znode is partitioned.
            List<String> leafNodes = getChildren(parentNodePath);
            for (String leafNodeName : leafNodes) {
              loadDelegationTokenFromNode(rmState,
                  getNodePath(parentNodePath, leafNodeName));
            }
          }
        } else if (splitIndex == 0
            && !(childNodeName.equals("1") || childNodeName.equals("2")
            || childNodeName.equals("3") || childNodeName.equals("4"))) {
          LOG.debug("Unknown child node with name {} under {}",
              childNodeName, tokenRoot);
        }
      }
      if (splitIndex != delegationTokenNodeSplitIndex && !dtNodeFound) {
        // If no loaded delegation token exists for a particular split index and
        // the split index for which tokens are being loaded is not the one
        // configured, then we do not need to keep track of this hierarchy for
        // storing/updating/removing delegation token znodes.
        rmDelegationTokenHierarchies.remove(splitIndex);
      }
    }
  }

  private void loadDelegationTokenFromNode(RMState rmState, String path)
      throws Exception {
    byte[] data = getData(path);
    if (data == null) {
      LOG.warn("Content of " + path + " is broken.");
    } else {
      ByteArrayInputStream is = new ByteArrayInputStream(data);
      try (DataInputStream fsIn = new DataInputStream(is)) {
        RMDelegationTokenIdentifierData identifierData =
            RMStateStoreUtils.readRMDelegationTokenIdentifierData(fsIn);
        RMDelegationTokenIdentifier identifier =
            identifierData.getTokenIdentifier();
        long renewDate = identifierData.getRenewDate();
        rmState.rmSecretManagerState.delegationTokenState.put(identifier,
            renewDate);
        LOG.debug("Loaded RMDelegationTokenIdentifier: {} renewDate={}",
            identifier, renewDate);
      }
    }
  }

  private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
      String appIdStr) throws Exception {
    byte[] appData = getData(appNodePath);
    LOG.debug("Loading application from znode: {}", appNodePath);
    ApplicationId appId = ApplicationId.fromString(appIdStr);
    ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(
        ApplicationStateDataProto.parseFrom(appData));
    if (!appId.equals(
        appState.getApplicationSubmissionContext().getApplicationId())) {
      throw new YarnRuntimeException("The node name is different from the " +
             "application id");
    }
    rmState.appState.put(appId, appState);
    loadApplicationAttemptState(appState, appNodePath);
  }

  private synchronized void loadRMAppState(RMState rmState) throws Exception {
    for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
      String appRoot = rmAppRootHierarchies.get(splitIndex);
      if (appRoot == null) {
        continue;
      }
      List<String> childNodes = getChildren(appRoot);
      boolean appNodeFound = false;
      for (String childNodeName : childNodes) {
        if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
          appNodeFound = true;
          if (splitIndex == 0) {
            loadRMAppStateFromAppNode(rmState,
                getNodePath(appRoot, childNodeName), childNodeName);
          } else {
            // If AppId Node is partitioned.
            String parentNodePath = getNodePath(appRoot, childNodeName);
            List<String> leafNodes = getChildren(parentNodePath);
            for (String leafNodeName : leafNodes) {
              String appIdStr = childNodeName + leafNodeName;
              loadRMAppStateFromAppNode(rmState,
                  getNodePath(parentNodePath, leafNodeName), appIdStr);
            }
          }
        } else if (!childNodeName.equals(RM_APP_ROOT_HIERARCHIES)){
          LOG.debug("Unknown child node with name {} under {}", childNodeName,
              appRoot);
        }
      }
      if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
        // If no loaded app exists for a particular split index and the split
        // index for which apps are being loaded is not the one configured, then
        // we do not need to keep track of this hierarchy for storing/updating/
        // removing app/app attempt znodes.
        rmAppRootHierarchies.remove(splitIndex);
      }
    }
  }

  private void loadApplicationAttemptState(ApplicationStateData appState,
      String appPath) throws Exception {
    List<String> attempts = getChildren(appPath);

    for (String attemptIDStr : attempts) {
      if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
        String attemptPath = getNodePath(appPath, attemptIDStr);
        byte[] attemptData = getData(attemptPath);

        ApplicationAttemptStateDataPBImpl attemptState =
            new ApplicationAttemptStateDataPBImpl(
                ApplicationAttemptStateDataProto.parseFrom(attemptData));

        appState.attempts.put(attemptState.getAttemptId(), attemptState);
      }
    }
    LOG.debug("Done loading applications from ZK state store");
  }

  /**
   * Get znode path based on full path and split index supplied.
   * @param path path for which parent needs to be returned.
   * @param splitIndex split index.
   * @return parent app node path.
   */
  private String getSplitZnodeParent(String path, int splitIndex) {
    // Calculated as string up to index (path Length - split index - 1). We
    // deduct 1 to exclude path separator.
    return path.substring(0, path.length() - splitIndex - 1);
  }

  /**
   * Checks if parent znode has no leaf nodes and if it does not have,
   * removes it.
   * @param path path of znode to be removed.
   * @param splitIndex split index.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private void checkRemoveParentZnode(String path, int splitIndex)
      throws Exception {
    if (splitIndex != 0) {
      String parentZnode = getSplitZnodeParent(path, splitIndex);
      List<String> children = null;
      try {
        children = getChildren(parentZnode);
      } catch (KeeperException.NoNodeException ke) {
        // It should be fine to swallow this exception as the parent znode we
        // intend to delete is already deleted.
        LOG.debug("Unable to remove parent node {} as it does not exist.",
            parentZnode);
        return;
      }
      // No apps stored under parent path.
      if (children != null && children.isEmpty()) {
        try {
          zkManager.safeDelete(parentZnode, zkAcl, fencingNodePath);
          LOG.debug("No leaf znode exists. Removing parent node {}",
              parentZnode);
        } catch (KeeperException.NotEmptyException ke) {
          // It should be fine to swallow this exception as the parent znode
          // has to be deleted only if it has no children. And this node has.
          LOG.debug("Unable to remove app parent node {} as it has children.",
              parentZnode);
        }
      }
    }
  }

  private void loadProxyCAManagerState(RMState rmState) throws Exception {
    String caCertPath = getNodePath(proxyCARoot, PROXY_CA_CERT_NODE);
    String caPrivateKeyPath = getNodePath(proxyCARoot,
        PROXY_CA_PRIVATE_KEY_NODE);

    if (!exists(caCertPath) || !exists(caPrivateKeyPath)) {
      LOG.warn("Couldn't find Proxy CA data");
      return;
    }

    byte[] caCertData = getData(caCertPath);
    byte[] caPrivateKeyData = getData(caPrivateKeyPath);

    if (caCertData == null || caPrivateKeyData == null) {
      LOG.warn("Couldn't recover Proxy CA data");
      return;
    }

    rmState.getProxyCAState().setCaCert(caCertData);
    rmState.getProxyCAState().setCaPrivateKey(caPrivateKeyData);
  }

  @Override
  public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    long start = clock.getTime();
    String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);

    LOG.debug("Storing info for app: {} at: {}", appId, nodeCreatePath);

    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    if (appStateData.length <= zknodeLimit) {
      zkManager.safeCreate(nodeCreatePath, appStateData, zkAcl,
          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
    } else {
      LOG.debug("Application state data size for {} is {}",
          appId, appStateData.length);

      throw new StoreLimitException("Application " + appId
          + " exceeds the maximum allowed size for application data. "
          + "See yarn.resourcemanager.zk-max-znode-size.bytes.");
    }
    opDurations.addStoreApplicationStateCallDuration(clock.getTime() - start);
  }

  @Override
  protected synchronized void updateApplicationStateInternal(
      ApplicationId appId, ApplicationStateData appStateDataPB)
      throws Exception {
    long start = clock.getTime();
    String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
    boolean pathExists = true;
    // Look for paths based on other split indices if path as per split index
    // does not exist.
    if (!exists(nodeUpdatePath)) {
      ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId.toString());
      if (alternatePathInfo != null) {
        nodeUpdatePath = alternatePathInfo.path;
      } else {
        // No alternate path exists. Create path as per configured split index.
        pathExists = false;
        if (appIdNodeSplitIndex != 0) {
          String rootNode =
              getSplitZnodeParent(nodeUpdatePath, appIdNodeSplitIndex);
          if (!exists(rootNode)) {
            zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
                zkAcl, fencingNodePath);
          }
        }
      }
    }

    LOG.debug("Storing final state info for app: {} at: {}", appId,
        nodeUpdatePath);

    byte[] appStateData = appStateDataPB.getProto().toByteArray();

    if (pathExists) {
      zkManager.safeSetData(nodeUpdatePath, appStateData, -1, zkAcl,
          fencingNodePath);
    } else {
      zkManager.safeCreate(nodeUpdatePath, appStateData, zkAcl,
          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
      LOG.debug("Path {} for {} didn't exist. Creating a new znode to update"
          + " the application state.", nodeUpdatePath, appId);
    }
    opDurations.addUpdateApplicationStateCallDuration(clock.getTime() - start);
  }

  /*
   * Handles store, update and remove application attempt state store
   * operations.
   */
  private void handleApplicationAttemptStateOp(
      ApplicationAttemptId appAttemptId,
      ApplicationAttemptStateData attemptStateDataPB, AppAttemptOp operation)
      throws Exception {
    String appId = appAttemptId.getApplicationId().toString();
    String appDirPath = getLeafAppIdNodePath(appId, false);
    // Look for paths based on other split indices.
    if (!exists(appDirPath)) {
      ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId);
      if (alternatePathInfo == null) {
        if (operation == AppAttemptOp.REMOVE) {
          // Unexpected. Assume that app attempt has been deleted.
          return;
        } else { // Store or Update operation
          throw new YarnRuntimeException("Unexpected Exception. App node for " +
              "app " + appId + " not found");
        }
      } else {
        appDirPath = alternatePathInfo.path;
      }
    }
    String path = getNodePath(appDirPath, appAttemptId.toString());
    byte[] attemptStateData = (attemptStateDataPB == null) ? null :
        attemptStateDataPB.getProto().toByteArray();
    LOG.debug("{} info for attempt: {} at: {}", operation, appAttemptId, path);

    switch (operation) {
    case UPDATE:
      if (exists(path)) {
        zkManager.safeSetData(path, attemptStateData, -1, zkAcl,
            fencingNodePath);
      } else {
        zkManager.safeCreate(path, attemptStateData, zkAcl,
            CreateMode.PERSISTENT, zkAcl, fencingNodePath);
        LOG.debug("Path {} for {} didn't exist. Created a new znode to update"
            + " the application attempt state.", path, appAttemptId);

      }
      break;
    case STORE:
      zkManager.safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT,
          zkAcl, fencingNodePath);
      break;
    case REMOVE:
      safeDeleteAndCheckNode(path, zkAcl, fencingNodePath);
      break;
    default:
      break;
    }
  }

  @Override
  protected synchronized void storeApplicationAttemptStateInternal(
      ApplicationAttemptId appAttemptId,
      ApplicationAttemptStateData attemptStateDataPB)
      throws Exception {
    handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
        AppAttemptOp.STORE);
  }

  @Override
  protected synchronized void updateApplicationAttemptStateInternal(
      ApplicationAttemptId appAttemptId,
      ApplicationAttemptStateData attemptStateDataPB)
      throws Exception {
    handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
        AppAttemptOp.UPDATE);
  }

  @Override
  protected synchronized void removeApplicationAttemptInternal(
      ApplicationAttemptId appAttemptId) throws Exception {
    handleApplicationAttemptStateOp(appAttemptId, null, AppAttemptOp.REMOVE);
  }

  @Override
  protected synchronized void removeApplicationStateInternal(
      ApplicationStateData appState) throws Exception {
    long start = clock.getTime();
    removeApp(appState.getApplicationSubmissionContext().
        getApplicationId().toString(), true, appState.attempts.keySet());
    opDurations.addRemoveApplicationStateCallDuration(clock.getTime() - start);
  }

  private void removeApp(String removeAppId) throws Exception {
    removeApp(removeAppId, false, null);
  }

  /**
   * Remove application node and its attempt nodes.
   *
   * @param removeAppId Application Id to be removed.
   * @param safeRemove Flag indicating if application and attempt nodes have to
   *     be removed safely under a fencing or not.
   * @param attempts list of attempts to be removed associated with this app.
   *     Ignored if safeRemove flag is false as we recursively delete all the
   *     child nodes directly.
   * @throws Exception if any exception occurs during ZK operation.
   */
  private void removeApp(String removeAppId, boolean safeRemove,
      Set<ApplicationAttemptId> attempts) throws Exception {
    String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false);
    int splitIndex = appIdNodeSplitIndex;
    // Look for paths based on other split indices if path as per configured
    // split index does not exist.
    if (!exists(appIdRemovePath)) {
      ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId);
      if (alternatePathInfo != null) {
        appIdRemovePath = alternatePathInfo.path;
        splitIndex = alternatePathInfo.splitIndex;
      } else {
        // Alternate path not found so return.
        return;
      }
    }
    if (safeRemove) {
      LOG.debug("Removing info for app: {} at: {} and its attempts.",
          removeAppId, appIdRemovePath);

      if (attempts != null) {
        for (ApplicationAttemptId attemptId : attempts) {
          String attemptRemovePath =
              getNodePath(appIdRemovePath, attemptId.toString());
          safeDeleteAndCheckNode(attemptRemovePath, zkAcl, fencingNodePath);
        }
      }
      safeDeleteAndCheckNode(appIdRemovePath, zkAcl, fencingNodePath);
    } else {
      CuratorFramework curatorFramework = zkManager.getCurator();
      curatorFramework.delete().deletingChildrenIfNeeded().
          forPath(appIdRemovePath);
    }
    // Check if we should remove the parent app node as well.
    checkRemoveParentZnode(appIdRemovePath, splitIndex);
  }

  @Override
  protected synchronized void storeRMDelegationTokenState(
      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
      throws Exception {
    String nodeCreatePath = getLeafDelegationTokenNodePath(
        rmDTIdentifier.getSequenceNumber(), true);
    LOG.debug("Storing {}{}", DELEGATION_TOKEN_PREFIX,
        rmDTIdentifier.getSequenceNumber());

    RMDelegationTokenIdentifierData identifierData =
        new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
    try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
      SafeTransaction trx = zkManager.createTransaction(zkAcl,
          fencingNodePath);
      trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
          CreateMode.PERSISTENT);
      // Update Sequence number only while storing DT
      seqOut.writeInt(rmDTIdentifier.getSequenceNumber());

      LOG.debug("Storing {}. SequenceNumber: {}", dtSequenceNumberPath,
          rmDTIdentifier.getSequenceNumber());

      trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
      trx.commit();
    }
  }

  @Override
  protected synchronized void removeRMDelegationTokenState(
      RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
    String nodeRemovePath = getLeafDelegationTokenNodePath(
        rmDTIdentifier.getSequenceNumber(), false);
    int splitIndex = delegationTokenNodeSplitIndex;
    // Look for paths based on other split indices if path as per configured
    // split index does not exist.
    if (!exists(nodeRemovePath)) {
      ZnodeSplitInfo alternatePathInfo =
          getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
      if (alternatePathInfo != null) {
        nodeRemovePath = alternatePathInfo.path;
        splitIndex = alternatePathInfo.splitIndex;
      } else {
        // Alternate path not found so return.
        return;
      }
    }

    LOG.debug("Removing RMDelegationToken_{}",
        rmDTIdentifier.getSequenceNumber());

    safeDeleteAndCheckNode(nodeRemovePath, zkAcl, fencingNodePath);

    // Check if we should remove the parent app node as well.
    checkRemoveParentZnode(nodeRemovePath, splitIndex);
  }

  @Override
  protected synchronized void updateRMDelegationTokenState(
      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
      throws Exception {
    String nodeUpdatePath = getLeafDelegationTokenNodePath(
        rmDTIdentifier.getSequenceNumber(), false);
    boolean pathExists = true;
    // Look for paths based on other split indices if path as per split index
    // does not exist.
    if (!exists(nodeUpdatePath)) {
      ZnodeSplitInfo alternatePathInfo =
          getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
      if (alternatePathInfo != null) {
        nodeUpdatePath = alternatePathInfo.path;
      } else {
        pathExists = false;
      }
    }

    if (pathExists) {
      LOG.debug("Updating {}{}", DELEGATION_TOKEN_PREFIX,
          rmDTIdentifier.getSequenceNumber());

      RMDelegationTokenIdentifierData identifierData =
          new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
      zkManager.safeSetData(nodeUpdatePath, identifierData.toByteArray(), -1,
          zkAcl, fencingNodePath);
    } else {
      storeRMDelegationTokenState(rmDTIdentifier, renewDate);
    }
  }

  @Override
  protected synchronized void storeRMDTMasterKeyState(
      DelegationKey delegationKey) throws Exception {
    String nodeCreatePath = getNodePath(dtMasterKeysRootPath,
        DELEGATION_KEY_PREFIX + delegationKey.getKeyId());
    LOG.debug("Storing RMDelegationKey_{}", delegationKey.getKeyId());
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    try(DataOutputStream fsOut = new DataOutputStream(os)) {
      delegationKey.write(fsOut);
      zkManager.safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
    }
  }

  @Override
  protected synchronized void removeRMDTMasterKeyState(
      DelegationKey delegationKey) throws Exception {
    String nodeRemovePath =
        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
            + delegationKey.getKeyId());

    LOG.debug("Removing RMDelegationKey_{}", delegationKey.getKeyId());

    safeDeleteAndCheckNode(nodeRemovePath, zkAcl, fencingNodePath);
  }

  @Override
  public synchronized void deleteStore() throws Exception {
    delete(zkRootNodePath);
  }

  @Override
  public synchronized void removeApplication(ApplicationId removeAppId)
      throws Exception {
    removeApp(removeAppId.toString());
  }

  @VisibleForTesting
  String getNodePath(String root, String nodeName) {
    return (root + "/" + nodeName);
  }

  @Override
  protected synchronized void storeOrUpdateAMRMTokenSecretManagerState(
      AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
      throws Exception {
    AMRMTokenSecretManagerState data =
        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
    byte[] stateData = data.getProto().toByteArray();

    zkManager.safeSetData(amrmTokenSecretManagerRoot, stateData, -1, zkAcl,
        fencingNodePath);
  }

  @Override
  protected synchronized void removeReservationState(String planName,
      String reservationIdName) throws Exception {
    String planNodePath = getNodePath(reservationRoot, planName);
    String reservationPath = getNodePath(planNodePath, reservationIdName);

    LOG.debug("Removing reservationallocation {} for plan {}",
        reservationIdName, planName);

    safeDeleteAndCheckNode(reservationPath, zkAcl, fencingNodePath);

    List<String> reservationNodes = getChildren(planNodePath);

    if (reservationNodes.isEmpty()) {
      safeDeleteAndCheckNode(planNodePath, zkAcl, fencingNodePath);
    }
  }

  @Override
  protected synchronized void storeReservationState(
      ReservationAllocationStateProto reservationAllocation, String planName,
      String reservationIdName) throws Exception {
    SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
    addOrUpdateReservationState(reservationAllocation, planName,
        reservationIdName, trx, false);
    trx.commit();
  }

  private void addOrUpdateReservationState(
      ReservationAllocationStateProto reservationAllocation, String planName,
      String reservationIdName, SafeTransaction trx, boolean isUpdate)
      throws Exception {
    String planCreatePath =
        getNodePath(reservationRoot, planName);
    String reservationPath = getNodePath(planCreatePath,
        reservationIdName);
    byte[] reservationData = reservationAllocation.toByteArray();

    if (!exists(planCreatePath)) {
      LOG.debug("Creating plan node: {} at: {}", planName, planCreatePath);

      trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT);
    }

    if (isUpdate) {
      LOG.debug("Updating reservation: {} in plan:{} at: {}",
          reservationIdName, planName, reservationPath);
      trx.setData(reservationPath, reservationData, -1);
    } else {
      LOG.debug("Storing reservation: {} in plan:{} at: {}",
          reservationIdName, planName, reservationPath);
      trx.create(reservationPath, reservationData, zkAcl,
          CreateMode.PERSISTENT);
    }
  }

  @Override
  protected void storeProxyCACertState(
      X509Certificate caCert, PrivateKey caPrivateKey) throws Exception {
    byte[] caCertData = caCert.getEncoded();
    byte[] caPrivateKeyData = caPrivateKey.getEncoded();

    String caCertPath = getNodePath(proxyCARoot, PROXY_CA_CERT_NODE);
    String caPrivateKeyPath = getNodePath(proxyCARoot,
        PROXY_CA_PRIVATE_KEY_NODE);

    if (exists(caCertPath)) {
      zkManager.safeSetData(caCertPath, caCertData, -1, zkAcl,
          fencingNodePath);
    } else {
      zkManager.safeCreate(caCertPath, caCertData, zkAcl,
          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
    }
    if (exists(caPrivateKeyPath)) {
      zkManager.safeSetData(caPrivateKeyPath, caPrivateKeyData, -1, zkAcl,
          fencingNodePath);
    } else {
      zkManager.safeCreate(caPrivateKeyPath, caPrivateKeyData, zkAcl,
          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
    }
  }

  /**
   * Get alternate path for app id if path according to configured split index
   * does not exist. We look for path based on all possible split indices.
   * @param appId
   * @return a {@link ZnodeSplitInfo} object containing the path and split
   *    index if it exists, null otherwise.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private ZnodeSplitInfo getAlternateAppPath(String appId) throws Exception {
    for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
      // Look for other paths
      int splitIndex = entry.getKey();
      if (splitIndex != appIdNodeSplitIndex) {
        String alternatePath =
            getLeafZnodePath(appId, entry.getValue(), splitIndex, false);
        if (exists(alternatePath)) {
          return new ZnodeSplitInfo(alternatePath, splitIndex);
        }
      }
    }
    return null;
  }

  /**
   * Returns leaf znode path based on node name and passed split index. If the
   * passed flag createParentIfNotExists is true, also creates the parent znode
   * if it does not exist.
   * @param nodeName the node name.
   * @param rootNode app root node based on split index.
   * @param splitIdx split index.
   * @param createParentIfNotExists flag which determines if parent znode
   *     needs to be created(as per split) if it does not exist.
   * @return leaf znode path.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private String getLeafZnodePath(String nodeName, String rootNode,
      int splitIdx, boolean createParentIfNotExists) throws Exception {
    if (splitIdx == 0) {
      return getNodePath(rootNode, nodeName);
    }
    int split = nodeName.length() - splitIdx;
    String rootNodePath =
        getNodePath(rootNode, nodeName.substring(0, split));
    if (createParentIfNotExists && !exists(rootNodePath)) {
      try {
        zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
            zkAcl, fencingNodePath);
      } catch (KeeperException.NodeExistsException e) {
        LOG.debug("Unable to create app parent node {} as it already exists.",
            rootNodePath);
      }
    }
    return getNodePath(rootNodePath, nodeName.substring(split));
  }

  /**
   * Returns leaf app node path based on app id and configured split index. If
   * the passed flag createParentIfNotExists is true, also creates the parent
   * app node if it does not exist.
   * @param appId application id.
   * @param createParentIfNotExists flag which determines if parent app node
   *     needs to be created(as per split) if it does not exist.
   * @return leaf app node path.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private String getLeafAppIdNodePath(String appId,
      boolean createParentIfNotExists) throws Exception {
    return getLeafZnodePath(appId, rmAppRootHierarchies.get(
        appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
  }

  /**
   * Returns leaf delegation token node path based on sequence number and
   * configured split index. If the passed flag createParentIfNotExists is true,
   * also creates the parent znode if it does not exist.  The sequence number
   * is padded to be at least 4 digits wide to ensure consistency with the split
   * indexing.
   * @param rmDTSequenceNumber delegation token sequence number.
   * @param createParentIfNotExists flag which determines if parent znode
   *     needs to be created(as per split) if it does not exist.
   * @return leaf delegation token node path.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
      boolean createParentIfNotExists) throws Exception {
    return getLeafDelegationTokenNodePath(rmDTSequenceNumber,
        createParentIfNotExists, delegationTokenNodeSplitIndex);
  }

  /**
   * Returns leaf delegation token node path based on sequence number and
   * passed split index. If the passed flag createParentIfNotExists is true,
   * also creates the parent znode if it does not exist.  The sequence number
   * is padded to be at least 4 digits wide to ensure consistency with the split
   * indexing.
   * @param rmDTSequenceNumber delegation token sequence number.
   * @param createParentIfNotExists flag which determines if parent znode
   *     needs to be created(as per split) if it does not exist.
   * @param split the split index to use
   * @return leaf delegation token node path.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
      boolean createParentIfNotExists, int split) throws Exception {
    String nodeName = DELEGATION_TOKEN_PREFIX;
    if (split == 0) {
      nodeName += rmDTSequenceNumber;
    } else {
      nodeName += String.format("%04d", rmDTSequenceNumber);
    }
    return getLeafZnodePath(nodeName, rmDelegationTokenHierarchies.get(split),
        split, createParentIfNotExists);
  }

  /**
   * Get alternate path for delegation token if path according to configured
   * split index does not exist. We look for path based on all possible split
   * indices.
   * @param rmDTSequenceNumber delegation token sequence number.
   * @return a {@link ZnodeSplitInfo} object containing the path and split
   *    index if it exists, null otherwise.
   * @throws Exception if any problem occurs while performing ZK operation.
   */
  private ZnodeSplitInfo getAlternateDTPath(int rmDTSequenceNumber)
      throws Exception {
    // Check all possible paths until we find it
    for (int splitIndex : rmDelegationTokenHierarchies.keySet()) {
      if (splitIndex != delegationTokenNodeSplitIndex) {
        String alternatePath = getLeafDelegationTokenNodePath(
            rmDTSequenceNumber, false, splitIndex);
        if (exists(alternatePath)) {
          return new ZnodeSplitInfo(alternatePath, splitIndex);
        }
      }
    }
    return null;
  }

  @VisibleForTesting
  byte[] getData(final String path) throws Exception {
    return zkManager.getData(path);
  }

  @VisibleForTesting
  List<ACL> getACL(final String path) throws Exception {
    return zkManager.getACL(path);
  }

  @VisibleForTesting
  List<String> getChildren(final String path) throws Exception {
    return zkManager.getChildren(path);
  }

  @VisibleForTesting
  boolean exists(final String path) throws Exception {
    return zkManager.exists(path);
  }

  @VisibleForTesting
  void create(final String path) throws Exception {
    zkManager.create(path, zkAcl);
  }

  @VisibleForTesting
  void delete(final String path) throws Exception {
    zkManager.delete(path);
  }

  /**
   * Deletes the path more safe.
   * When NoNodeException is encountered, if the node does not exist,
   * it will ignore this exception to avoid triggering
   * a greater impact of ResourceManager failover on the cluster.
   * @param path Path to be deleted.
   * @param fencingACL fencingACL.
   * @param fencingPath fencingNodePath.
   * @throws Exception if any problem occurs while performing deletion.
   */
  public void safeDeleteAndCheckNode(String path, List<ACL> fencingACL,
      String fencingPath) throws Exception {
    try{
      zkManager.safeDelete(path, fencingACL, fencingPath);
    } catch (KeeperException.NoNodeException nne) {
      if(!exists(path)){
        LOG.info("Node " + path + " doesn't exist to delete");
      } else {
        throw new KeeperException.NodeExistsException("Node " + path + " should not exist");
      }
    }
  }

  /**
   * Helper class that periodically attempts creating a znode to ensure that
   * this RM continues to be the Active.
   */
  private class VerifyActiveStatusThread extends Thread {
    VerifyActiveStatusThread() {
      super(VerifyActiveStatusThread.class.getName());
    }

    @Override
    public void run() {
      try {
        while (!isFencedState()) {
          // Create and delete fencing node
          zkManager.createTransaction(zkAcl, fencingNodePath).commit();
          Thread.sleep(zkSessionTimeout);
        }
      } catch (InterruptedException ie) {
        LOG.info(getName() + " thread interrupted! Exiting!");
        interrupt();
      } catch (Exception e) {
        notifyStoreOperationFailed(new StoreFencedException());
      }
    }
  }
}