RMStateStoreTestBase.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import javax.crypto.SecretKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.webproxy.ProxyCA;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

public class RMStateStoreTestBase {

  public static final Logger LOG =
      LoggerFactory.getLogger(RMStateStoreTestBase.class);

  protected final long epoch = 10L;

  private final long epochRange = 10L;

  static class TestDispatcher implements Dispatcher, EventHandler<Event> {

    ApplicationAttemptId attemptId;

    boolean notified = false;

    @SuppressWarnings("rawtypes")
    @Override
    public void register(Class<? extends Enum> eventType,
                         EventHandler handler) {
    }

    @Override
    public void handle(Event event) {
      if (event instanceof RMAppAttemptEvent) {
        RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event;
        assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId());
      }
      notified = true;
      synchronized (this) {
        notifyAll();
      }
    }

    @SuppressWarnings("rawtypes")
    @Override
    public EventHandler<Event> getEventHandler() {
      return this;
    }

  }

  public static class StoreStateVerifier {
    void afterStoreApp(RMStateStore store, ApplicationId appId) {}
    void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId
            appAttId) {}
  }

  interface RMStateStoreHelper {
    RMStateStore getRMStateStore() throws Exception;
    boolean isFinalStateValid() throws Exception;
    void writeVersion(Version version) throws Exception;
    Version getCurrentVersion() throws Exception;
    boolean appExists(RMApp app) throws Exception;
    boolean attemptExists(RMAppAttempt attempt) throws Exception;
  }

  public long getEpochRange() {
    return epochRange;
  }

  void waitNotify(TestDispatcher dispatcher) {
    long startTime = System.currentTimeMillis();
    while(!dispatcher.notified) {
      synchronized (dispatcher) {
        try {
          dispatcher.wait(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      if(System.currentTimeMillis() - startTime > 1000*60) {
        fail("Timed out attempt store notification");
      }
    }
    dispatcher.notified = false;
  }

  protected RMApp storeApp(RMStateStore store, ApplicationId appId,
      long submitTime, long startTime) throws Exception {
    ApplicationSubmissionContext context =
        new ApplicationSubmissionContextPBImpl();
    context.setApplicationId(appId);
    context.setAMContainerSpec(new ContainerLaunchContextPBImpl());

    RMApp mockApp = mock(RMApp.class);
    when(mockApp.getApplicationId()).thenReturn(appId);
    when(mockApp.getSubmitTime()).thenReturn(submitTime);
    when(mockApp.getStartTime()).thenReturn(startTime);
    when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
    when(mockApp.getUser()).thenReturn("test");
    when(mockApp.getCallerContext())
        .thenReturn(new CallerContext.Builder("context").build());
    store.storeNewApplication(mockApp);
    return mockApp;
  }

  protected RMAppAttempt storeAttempt(RMStateStore store,
      ApplicationAttemptId attemptId,
      String containerIdStr, Token<AMRMTokenIdentifier> appToken,
      SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
      throws Exception {

    RMAppAttemptMetrics mockRmAppAttemptMetrics = 
        mock(RMAppAttemptMetrics.class);
    Container container = new ContainerPBImpl();
    container.setId(ContainerId.fromString(containerIdStr));
    RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
    when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
    when(mockAttempt.getMasterContainer()).thenReturn(container);
    when(mockAttempt.getAMRMToken()).thenReturn(appToken);
    when(mockAttempt.getClientTokenMasterKey())
        .thenReturn(clientTokenMasterKey);
    when(mockAttempt.getRMAppAttemptMetrics())
        .thenReturn(mockRmAppAttemptMetrics);
    when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
        .thenReturn(new AggregateAppResourceUsage(new HashMap<>()));
    dispatcher.attemptId = attemptId;
    store.storeNewApplicationAttempt(mockAttempt);
    waitNotify(dispatcher);
    return mockAttempt;
  }

  protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher,
      ApplicationAttemptStateData attemptState) {
    dispatcher.attemptId = attemptState.getAttemptId();
    store.updateApplicationAttemptState(attemptState);
    waitNotify(dispatcher);
  }

  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
          throws Exception {
    testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
  }

  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
                           StoreStateVerifier verifier)
      throws Exception {
    long submitTime = System.currentTimeMillis();
    long startTime = System.currentTimeMillis() + 1234;
    Configuration conf = new YarnConfiguration();
    RMStateStore store = stateStoreHelper.getRMStateStore();
    TestDispatcher dispatcher = new TestDispatcher();
    store.setRMDispatcher(dispatcher);

    RMContext rmContext = mock(RMContext.class);
    when(rmContext.getStateStore()).thenReturn(store);

    AMRMTokenSecretManager appTokenMgr =
        spy(new AMRMTokenSecretManager(conf, rmContext));

    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);

    ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
        new ClientToAMTokenSecretManagerInRM();

    ApplicationAttemptId attemptId1 = ApplicationAttemptId.fromString(
        "appattempt_1352994193343_0001_000001");
    ApplicationId appId1 = attemptId1.getApplicationId();
    storeApp(store, appId1, submitTime, startTime);
    verifier.afterStoreApp(store, appId1);

    // create application token and client token key for attempt1
    Token<AMRMTokenIdentifier> appAttemptToken1 =
        generateAMRMToken(attemptId1, appTokenMgr);
    SecretKey clientTokenKey1 =
        clientToAMTokenMgr.createMasterKey(attemptId1);

    ContainerId containerId1 = storeAttempt(store, attemptId1,
        "container_1352994193343_0001_01_000001",
        appAttemptToken1, clientTokenKey1, dispatcher)
        .getMasterContainer().getId();

    String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
    ApplicationAttemptId attemptId2 = ApplicationAttemptId.fromString(
        appAttemptIdStr2);

    // create application token and client token key for attempt2
    Token<AMRMTokenIdentifier> appAttemptToken2 =
        generateAMRMToken(attemptId2, appTokenMgr);
    SecretKey clientTokenKey2 =
        clientToAMTokenMgr.createMasterKey(attemptId2);

    ContainerId containerId2 = storeAttempt(store, attemptId2,
        "container_1352994193343_0001_02_000001",
        appAttemptToken2, clientTokenKey2, dispatcher)
        .getMasterContainer().getId();

    ApplicationAttemptId attemptIdRemoved = ApplicationAttemptId.fromString(
        "appattempt_1352994193343_0002_000001");
    ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
    storeApp(store, appIdRemoved, submitTime, startTime);
    storeAttempt(store, attemptIdRemoved,
        "container_1352994193343_0002_01_000001", null, null, dispatcher);
    verifier.afterStoreAppAttempt(store, attemptIdRemoved);

    RMApp mockRemovedApp = mock(RMApp.class);
    RMAppAttemptMetrics mockRmAppAttemptMetrics = 
        mock(RMAppAttemptMetrics.class);
    HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
                              new HashMap<ApplicationAttemptId, RMAppAttempt>();
    ApplicationSubmissionContext context =
        new ApplicationSubmissionContextPBImpl();
    context.setApplicationId(appIdRemoved);
    when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
    when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
    when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
    when(mockRemovedApp.getUser()).thenReturn("user1");
    RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
    when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
    when(mockRemovedAttempt.getRMAppAttemptMetrics())
        .thenReturn(mockRmAppAttemptMetrics);
    when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
        .thenReturn(new AggregateAppResourceUsage(new HashMap<>()));
    attempts.put(attemptIdRemoved, mockRemovedAttempt);
    store.removeApplication(mockRemovedApp);

    // remove application directory recursively.
    storeApp(store, appIdRemoved, submitTime, startTime);
    storeAttempt(store, attemptIdRemoved,
        "container_1352994193343_0002_01_000001", null, null, dispatcher);
    store.removeApplication(mockRemovedApp);

    // let things settle down
    Thread.sleep(1000);
    store.close();

    // give tester a chance to modify app state in the store
    modifyAppState();

    // load state
    store = stateStoreHelper.getRMStateStore();
    store.setRMDispatcher(dispatcher);
    RMState state = store.loadState();
    Map<ApplicationId, ApplicationStateData> rmAppState =
        state.getApplicationState();

    ApplicationStateData appState = rmAppState.get(appId1);
    // app is loaded
    assertNotNull(appState);
    // app is loaded correctly
    assertEquals(submitTime, appState.getSubmitTime());
    assertEquals(startTime, appState.getStartTime());
    // submission context is loaded correctly
    assertEquals(appId1,
                 appState.getApplicationSubmissionContext().getApplicationId());
    ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1);
    // attempt1 is loaded correctly
    assertNotNull(attemptState);
    assertEquals(attemptId1, attemptState.getAttemptId());
    assertEquals(-1000, attemptState.getAMContainerExitStatus());
    // attempt1 container is loaded correctly
    assertEquals(containerId1, attemptState.getMasterContainer().getId());
    // attempt1 client token master key is loaded correctly
    assertArrayEquals(
        clientTokenKey1.getEncoded(),
        attemptState.getAppAttemptTokens()
            .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
    assertEquals("context", appState.getCallerContext().getContext());

    attemptState = appState.getAttempt(attemptId2);
    // attempt2 is loaded correctly
    assertNotNull(attemptState);
    assertEquals(attemptId2, attemptState.getAttemptId());
    // attempt2 container is loaded correctly
    assertEquals(containerId2, attemptState.getMasterContainer().getId());
    // attempt2 client token master key is loaded correctly
    assertArrayEquals(
        clientTokenKey2.getEncoded(),
        attemptState.getAppAttemptTokens()
            .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));

    //******* update application/attempt state *******//
    ApplicationStateData appState2 =
        ApplicationStateData.newInstance(appState.getSubmitTime(),
            appState.getStartTime(), appState.getUser(),
            appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
            "appDiagnostics", 123, 1234, appState.getCallerContext());
    appState2.attempts.putAll(appState.attempts);
    store.updateApplicationState(appState2);

    ApplicationAttemptStateData oldAttemptState = attemptState;
    ApplicationAttemptStateData newAttemptState =
        ApplicationAttemptStateData.newInstance(
            oldAttemptState.getAttemptId(),
            oldAttemptState.getMasterContainer(),
            oldAttemptState.getAppAttemptTokens(),
            oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
            "myTrackingUrl", "attemptDiagnostics",
            FinalApplicationStatus.SUCCEEDED, 100,
            oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>(),
            0);
    store.updateApplicationAttemptState(newAttemptState);

    // test updating the state of an app/attempt whose initial state was not
    // saved.
    ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
    ApplicationSubmissionContext dummyContext =
        new ApplicationSubmissionContextPBImpl();
    dummyContext.setApplicationId(dummyAppId);
    dummyContext.setAMContainerSpec(new ContainerLaunchContextPBImpl());
    ApplicationStateData dummyApp =
        ApplicationStateData.newInstance(appState.getSubmitTime(),
            appState.getStartTime(), appState.getUser(), dummyContext,
            RMAppState.FINISHED, "appDiagnostics", 123, 1234, null);
    store.updateApplicationState(dummyApp);

    ApplicationAttemptId dummyAttemptId =
        ApplicationAttemptId.newInstance(dummyAppId, 6);
    ApplicationAttemptStateData dummyAttempt =
        ApplicationAttemptStateData.newInstance(dummyAttemptId,
            oldAttemptState.getMasterContainer(),
            oldAttemptState.getAppAttemptTokens(),
            oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
            "myTrackingUrl", "attemptDiagnostics",
            FinalApplicationStatus.SUCCEEDED, 111,
            oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>(),
            0);
    store.updateApplicationAttemptState(dummyAttempt);

    // let things settle down
    Thread.sleep(1000);
    store.close();

    // check updated application state.
    store = stateStoreHelper.getRMStateStore();
    store.setRMDispatcher(dispatcher);
    RMState newRMState = store.loadState();
    Map<ApplicationId, ApplicationStateData> newRMAppState =
        newRMState.getApplicationState();
    assertNotNull(newRMAppState.get(
        dummyApp.getApplicationSubmissionContext().getApplicationId()));
    ApplicationStateData updatedAppState = newRMAppState.get(appId1);
    assertEquals(appState.getApplicationSubmissionContext().getApplicationId(),
        updatedAppState.getApplicationSubmissionContext().getApplicationId());
    assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime());
    assertEquals(appState.getStartTime(), updatedAppState.getStartTime());
    assertEquals(appState.getUser(), updatedAppState.getUser());
    // new app state fields
    assertEquals( RMAppState.FINISHED, updatedAppState.getState());
    assertEquals("appDiagnostics", updatedAppState.getDiagnostics());
    assertEquals(1234, updatedAppState.getFinishTime());

    // check updated attempt state
    assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext
        ().getApplicationId()).getAttempt(dummyAttemptId));
    ApplicationAttemptStateData updatedAttemptState =
        updatedAppState.getAttempt(newAttemptState.getAttemptId());
    assertEquals(oldAttemptState.getAttemptId(),
      updatedAttemptState.getAttemptId());
    assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId());
    assertArrayEquals(
        clientTokenKey2.getEncoded(),
        attemptState.getAppAttemptTokens()
            .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
    // new attempt state fields
    assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
    assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
    assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
    assertEquals(100, updatedAttemptState.getAMContainerExitStatus());
    assertEquals(FinalApplicationStatus.SUCCEEDED,
      updatedAttemptState.getFinalApplicationStatus());

    // assert store is in expected state after everything is cleaned
    assertTrue(stateStoreHelper.isFinalStateValid());

    store.close();
  }

  public void testRMDTSecretManagerStateStore(
      RMStateStoreHelper stateStoreHelper) throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    TestDispatcher dispatcher = new TestDispatcher();
    store.setRMDispatcher(dispatcher);

    // store RM delegation token;
    RMDelegationTokenIdentifier dtId1 =
        new RMDelegationTokenIdentifier(new Text("owner1"),
          new Text("renewer1"), new Text("realuser1"));
    int sequenceNumber = 1111;
    dtId1.setSequenceNumber(sequenceNumber);
    byte[] tokenBeforeStore = dtId1.getBytes();
    Long renewDate1 = new Long(System.currentTimeMillis());
    store.storeRMDelegationToken(dtId1, renewDate1);
    modifyRMDelegationTokenState();
    Map<RMDelegationTokenIdentifier, Long> token1 =
        new HashMap<RMDelegationTokenIdentifier, Long>();
    token1.put(dtId1, renewDate1);
    // store delegation key;
    DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
    HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
    keySet.add(key);
    store.storeRMDTMasterKey(key);

    RMDTSecretManagerState secretManagerState =
        store.loadState().getRMDTSecretManagerState();
    assertEquals(token1, secretManagerState.getTokenState());
    assertEquals(keySet, secretManagerState.getMasterKeyState());
    assertEquals(sequenceNumber,
        secretManagerState.getDTSequenceNumber());
    RMDelegationTokenIdentifier tokenAfterStore =
        secretManagerState.getTokenState().keySet().iterator().next();
    assertTrue(Arrays.equals(tokenBeforeStore,
      tokenAfterStore.getBytes()));

    // update RM delegation token;
    renewDate1 = new Long(System.currentTimeMillis());
    store.updateRMDelegationToken(dtId1, renewDate1);
    token1.put(dtId1, renewDate1);

    RMDTSecretManagerState updateSecretManagerState =
        store.loadState().getRMDTSecretManagerState();
    assertEquals(token1, updateSecretManagerState.getTokenState());
    assertEquals(keySet, updateSecretManagerState.getMasterKeyState());
    assertEquals(sequenceNumber,
        updateSecretManagerState.getDTSequenceNumber());

    // check to delete delegationKey
    store.removeRMDTMasterKey(key);
    keySet.clear();
    RMDTSecretManagerState noKeySecretManagerState =
        store.loadState().getRMDTSecretManagerState();
    assertEquals(token1, noKeySecretManagerState.getTokenState());
    assertEquals(keySet, noKeySecretManagerState.getMasterKeyState());
    assertEquals(sequenceNumber,
        noKeySecretManagerState.getDTSequenceNumber());

    // check to delete delegationToken
    store.removeRMDelegationToken(dtId1);
    RMDTSecretManagerState noKeyAndTokenSecretManagerState =
        store.loadState().getRMDTSecretManagerState();
    token1.clear();
    assertEquals(token1,
        noKeyAndTokenSecretManagerState.getTokenState());
    assertEquals(keySet,
        noKeyAndTokenSecretManagerState.getMasterKeyState());
    assertEquals(sequenceNumber,
        noKeySecretManagerState.getDTSequenceNumber());
    store.close();

  }

  protected Token<AMRMTokenIdentifier> generateAMRMToken(
      ApplicationAttemptId attemptId,
      AMRMTokenSecretManager appTokenMgr) {
    Token<AMRMTokenIdentifier> appToken =
        appTokenMgr.createAndGetAMRMToken(attemptId);
    appToken.setService(new Text("appToken service"));
    return appToken;
  }

  public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
      throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    store.setRMDispatcher(new TestDispatcher());

    // default version
    Version defaultVersion = stateStoreHelper.getCurrentVersion();
    store.checkVersion();
    assertEquals(defaultVersion, store.loadVersion());

    // compatible version
    Version compatibleVersion =
        Version.newInstance(defaultVersion.getMajorVersion(),
          defaultVersion.getMinorVersion() + 2);
    stateStoreHelper.writeVersion(compatibleVersion);
    assertEquals(compatibleVersion, store.loadVersion());
    store.checkVersion();
    // overwrite the compatible version
    assertEquals(defaultVersion, store.loadVersion());

    // incompatible version
    Version incompatibleVersion =
        Version.newInstance(defaultVersion.getMajorVersion() + 2,
          defaultVersion.getMinorVersion());
    stateStoreHelper.writeVersion(incompatibleVersion);
    try {
      store.checkVersion();
      fail("Invalid version, should fail.");
    } catch (Throwable t) {
      assertTrue(t instanceof RMStateVersionIncompatibleException);
    }
  }
  
  public void testEpoch(RMStateStoreHelper stateStoreHelper)
      throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    store.setRMDispatcher(new TestDispatcher());
    
    long firstTimeEpoch = store.getAndIncrementEpoch();
    assertEquals(epoch, firstTimeEpoch);
    
    long secondTimeEpoch = store.getAndIncrementEpoch();
    assertEquals(epoch + 1, secondTimeEpoch);
    
    long thirdTimeEpoch = store.getAndIncrementEpoch();
    assertEquals(epoch + 2, thirdTimeEpoch);

    for (int i = 0; i < epochRange; ++i) {
      store.getAndIncrementEpoch();
    }
    long wrappedEpoch = store.getAndIncrementEpoch();
    // Epoch should have wrapped around and then incremented once for a total
    // of + 3
    assertEquals(epoch + 3, wrappedEpoch);
  }

  public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
      throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    store.setRMDispatcher(new TestDispatcher());
    ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);

    for (RMApp app : appList) {
      // remove the app
      store.removeApplication(app);
      // wait for app to be removed.
      while (true) {
        if (!stateStoreHelper.appExists(app)) {
          break;
        } else {
          Thread.sleep(100);
        }
      }
    }
  }

  private ArrayList<RMApp> createAndStoreApps(
      RMStateStoreHelper stateStoreHelper, RMStateStore store, int numApps)
      throws Exception {
    ArrayList<RMApp> appList = new ArrayList<RMApp>();
    for (int i = 0; i < numApps; i++) {
      ApplicationId appId = ApplicationId.newInstance(1383183338, i);
      RMApp app = storeApp(store, appId, 123456789, 987654321);
      appList.add(app);
    }

    assertEquals(numApps, appList.size());
    for (RMApp app : appList) {
      // wait for app to be stored.
      while (true) {
        if (stateStoreHelper.appExists(app)) {
          break;
        } else {
          Thread.sleep(100);
        }
      }
    }
    return appList;
  }

  public void testDeleteStore(RMStateStoreHelper stateStoreHelper)
      throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
    store.deleteStore();
    // verify apps deleted
    for (RMApp app : appList) {
      assertFalse(stateStoreHelper.appExists(app));
    }
  }

  public void testRemoveApplication(RMStateStoreHelper stateStoreHelper)
      throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    int noOfApps = 2;
    ArrayList<RMApp> appList =
        createAndStoreApps(stateStoreHelper, store, noOfApps);

    RMApp rmApp1 = appList.get(0);
    store.removeApplication(rmApp1.getApplicationId());
    assertFalse(stateStoreHelper.appExists(rmApp1));

    RMApp rmApp2 = appList.get(1);
    assertTrue(stateStoreHelper.appExists(rmApp2));
  }

  public void testRemoveAttempt(RMStateStoreHelper stateStoreHelper)
    throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    TestDispatcher dispatcher = new TestDispatcher();
    store.setRMDispatcher(dispatcher);

    ApplicationId appId = ApplicationId.newInstance(1383183339, 6);
    storeApp(store, appId, 123456, 564321);

    ApplicationAttemptId attemptId1 =
        ApplicationAttemptId.newInstance(appId, 1);
    RMAppAttempt attempt1 = storeAttempt(store, attemptId1,
        ContainerId.newContainerId(attemptId1, 1).toString(),
        null, null, dispatcher);
    ApplicationAttemptId attemptId2 =
        ApplicationAttemptId.newInstance(appId, 2);
    RMAppAttempt attempt2 = storeAttempt(store, attemptId2,
        ContainerId.newContainerId(attemptId2, 1).toString(),
        null, null, dispatcher);
    store.removeApplicationAttemptInternal(attemptId1);
    assertFalse(stateStoreHelper.attemptExists(attempt1));
    assertTrue(stateStoreHelper.attemptExists(attempt2));

    // let things settle down
    Thread.sleep(1000);
    store.close();

    // load state
    store = stateStoreHelper.getRMStateStore();
    RMState state = store.loadState();
    Map<ApplicationId, ApplicationStateData> rmAppState =
        state.getApplicationState();

    ApplicationStateData appState = rmAppState.get(appId);
    // app is loaded
    assertNotNull(appState);
    assertEquals(2, appState.getFirstAttemptId());
    assertNull(appState.getAttempt(attemptId1));
    assertNotNull(appState.getAttempt(attemptId2));
  }

  protected void modifyAppState() throws Exception {

  }

  protected void modifyRMDelegationTokenState() throws Exception {

  }

  public void testAMRMTokenSecretManagerStateStore(
      RMStateStoreHelper stateStoreHelper) throws Exception {
    System.out.println("Start testing");
    RMStateStore store = stateStoreHelper.getRMStateStore();
    TestDispatcher dispatcher = new TestDispatcher();
    store.setRMDispatcher(dispatcher);

    RMContext rmContext = mock(RMContext.class);
    when(rmContext.getStateStore()).thenReturn(store);
    Configuration conf = new YarnConfiguration();
    AMRMTokenSecretManager appTokenMgr =
        new AMRMTokenSecretManager(conf, rmContext);

    //create and save the first masterkey
    MasterKeyData firstMasterKeyData = appTokenMgr.createNewMasterKey();

    AMRMTokenSecretManagerState state1 =
        AMRMTokenSecretManagerState.newInstance(
          firstMasterKeyData.getMasterKey(), null);
    rmContext.getStateStore()
        .storeOrUpdateAMRMTokenSecretManager(state1,
      false);

    // load state
    store = stateStoreHelper.getRMStateStore();
    when(rmContext.getStateStore()).thenReturn(store);
    store.setRMDispatcher(dispatcher);
    RMState state = store.loadState();
    assertNotNull(state.getAMRMTokenSecretManagerState());
    assertEquals(firstMasterKeyData.getMasterKey(), state
      .getAMRMTokenSecretManagerState().getCurrentMasterKey());
    assertNull(state
      .getAMRMTokenSecretManagerState().getNextMasterKey());

    //create and save the second masterkey
    MasterKeyData secondMasterKeyData = appTokenMgr.createNewMasterKey();
    AMRMTokenSecretManagerState state2 =
        AMRMTokenSecretManagerState
          .newInstance(firstMasterKeyData.getMasterKey(),
            secondMasterKeyData.getMasterKey());
    rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManager(state2,
      true);

    // load state
    store = stateStoreHelper.getRMStateStore();
    when(rmContext.getStateStore()).thenReturn(store);
    store.setRMDispatcher(dispatcher);
    RMState state_2 = store.loadState();
    assertNotNull(state_2.getAMRMTokenSecretManagerState());
    assertEquals(firstMasterKeyData.getMasterKey(), state_2
      .getAMRMTokenSecretManagerState().getCurrentMasterKey());
    assertEquals(secondMasterKeyData.getMasterKey(), state_2
      .getAMRMTokenSecretManagerState().getNextMasterKey());

    // re-create the masterKeyData based on the recovered masterkey
    // should have the same secretKey
    appTokenMgr.recover(state_2);
    assertEquals(appTokenMgr.getCurrnetMasterKeyData().getSecretKey(),
      firstMasterKeyData.getSecretKey());
    assertEquals(appTokenMgr.getNextMasterKeyData().getSecretKey(),
      secondMasterKeyData.getSecretKey());

    store.close();
  }

  public void testReservationStateStore(
      RMStateStoreHelper stateStoreHelper) throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    TestDispatcher dispatcher = new TestDispatcher();
    store.setRMDispatcher(dispatcher);

    RMContext rmContext = mock(RMContext.class);
    when(rmContext.getStateStore()).thenReturn(store);

    long ts = System.currentTimeMillis();
    ReservationId r1 = ReservationId.newInstance(ts, 1);
    int start = 1;
    int[] alloc = { 10, 10, 10, 10, 10 };
    ResourceCalculator res = new DefaultResourceCalculator();
    Resource minAlloc = Resource.newInstance(1024, 1);
    boolean hasGang = true;
    String planName = "dedicated";
    ReservationDefinition rDef =
        ReservationSystemTestUtil.createSimpleReservationDefinition(
            start, start + alloc.length + 1, alloc.length);
    ReservationAllocation allocation = new InMemoryReservationAllocation(
        r1, rDef, "u3", planName, 0, 0 + alloc.length,
        ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
        minAlloc, hasGang);
    ReservationAllocationStateProto allocationStateProto =
        ReservationSystemUtil.buildStateProto(allocation);
    assertAllocationStateEqual(allocation, allocationStateProto);

    // 1. Load empty store and verify no errors
    store = stateStoreHelper.getRMStateStore();
    when(rmContext.getStateStore()).thenReturn(store);
    store.setRMDispatcher(dispatcher);
    RMState state = store.loadState();
    Map<String, Map<ReservationId, ReservationAllocationStateProto>>
      reservationState = state.getReservationState();
    assertNotNull(reservationState);

    // 2. Store single reservation and verify
    String reservationIdName = r1.toString();
    rmContext.getStateStore().storeNewReservation(
        allocationStateProto,
        planName, reservationIdName);


    // load state and verify new state
    validateStoredReservation(
        stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
        allocationStateProto);

    // 3. update state test
    alloc = new int[]{6, 6, 6};
    hasGang = false;
    allocation = new InMemoryReservationAllocation(
        r1, rDef, "u3", planName, 2, 2 + alloc.length,
        ReservationSystemTestUtil.generateAllocation(1L, 2L, alloc), res,
        minAlloc, hasGang);
    allocationStateProto =
        ReservationSystemUtil.buildStateProto(allocation);
    rmContext.getStateStore().removeReservation(planName, reservationIdName);
    rmContext.getStateStore().storeNewReservation(allocationStateProto, planName, reservationIdName);

    // load state and verify updated reservation
    validateStoredReservation(
        stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
        allocationStateProto);

    // 4. add a second one and remove the first one
    ReservationId r2 = ReservationId.newInstance(ts, 2);
    ReservationAllocation allocation2 = new InMemoryReservationAllocation(
        r2, rDef, "u3", planName, 0, 0 + alloc.length,
        ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
        minAlloc, hasGang);
    ReservationAllocationStateProto allocationStateProto2 =
        ReservationSystemUtil.buildStateProto(allocation2);
    String reservationIdName2 = r2.toString();

    rmContext.getStateStore().storeNewReservation(
        allocationStateProto2,
        planName, reservationIdName2);
    rmContext.getStateStore().removeReservation(planName, reservationIdName);

    // load state and verify r1 is removed and r2 is still there
    Map<ReservationId, ReservationAllocationStateProto> reservations;

    store = stateStoreHelper.getRMStateStore();
    when(rmContext.getStateStore()).thenReturn(store);
    store.setRMDispatcher(dispatcher);
    state = store.loadState();
    reservationState = state.getReservationState();
    assertNotNull(reservationState);
    reservations = reservationState.get(planName);
    assertNotNull(reservations);
    ReservationAllocationStateProto storedReservationAllocation =
        reservations.get(r1);
    assertNull(storedReservationAllocation,
        "Removed reservation should not be available in store");

    storedReservationAllocation = reservations.get(r2);
    assertAllocationStateEqual(
        allocationStateProto2, storedReservationAllocation);
    assertAllocationStateEqual(allocation2, storedReservationAllocation);


    // 5. remove last reservation removes the plan state
    rmContext.getStateStore().removeReservation(planName, reservationIdName2);

    store = stateStoreHelper.getRMStateStore();
    when(rmContext.getStateStore()).thenReturn(store);
    store.setRMDispatcher(dispatcher);
    state = store.loadState();
    reservationState = state.getReservationState();
    assertNotNull(reservationState);
    reservations = reservationState.get(planName);
    assertNull(reservations);
  }

  public void testProxyCA(
      RMStateStoreHelper stateStoreHelper) throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    TestDispatcher dispatcher = new TestDispatcher();
    store.setRMDispatcher(dispatcher);

    ProxyCA originalProxyCA = new ProxyCA();
    originalProxyCA.init();
    store.storeProxyCACert(originalProxyCA.getCaCert(),
        originalProxyCA.getCaKeyPair().getPrivate());

    RMStateStore.ProxyCAState proxyCAState =
        store.loadState().getProxyCAState();
    assertEquals(originalProxyCA.getCaCert(), proxyCAState.getCaCert());
    assertEquals(originalProxyCA.getCaKeyPair().getPrivate(),
        proxyCAState.getCaPrivateKey());

    // Try replacing with a different ProxyCA
    ProxyCA newProxyCA = new ProxyCA();
    newProxyCA.init();
    assertNotEquals(originalProxyCA.getCaCert(), newProxyCA.getCaCert());
    assertNotEquals(originalProxyCA.getCaKeyPair().getPrivate(),
        newProxyCA.getCaKeyPair().getPrivate());
    store.storeProxyCACert(newProxyCA.getCaCert(),
        newProxyCA.getCaKeyPair().getPrivate());

    proxyCAState = store.loadState().getProxyCAState();
    assertEquals(newProxyCA.getCaCert(), proxyCAState.getCaCert());
    assertEquals(newProxyCA.getCaKeyPair().getPrivate(),
        proxyCAState.getCaPrivateKey());
  }

  private void validateStoredReservation(
      RMStateStoreHelper stateStoreHelper, TestDispatcher dispatcher,
      RMContext rmContext, ReservationId r1, String planName,
      ReservationAllocation allocation,
      ReservationAllocationStateProto allocationStateProto) throws Exception {
    RMStateStore store = stateStoreHelper.getRMStateStore();
    when(rmContext.getStateStore()).thenReturn(store);
    store.setRMDispatcher(dispatcher);
    RMState state = store.loadState();
    Map<String, Map<ReservationId, ReservationAllocationStateProto>>
        reservationState = state.getReservationState();
    assertNotNull(reservationState);
    Map<ReservationId, ReservationAllocationStateProto> reservations =
        reservationState.get(planName);
    assertNotNull(reservations);
    ReservationAllocationStateProto storedReservationAllocation =
        reservations.get(r1);
    assertNotNull(storedReservationAllocation);

    assertAllocationStateEqual(
        allocationStateProto, storedReservationAllocation);
    assertAllocationStateEqual(allocation, storedReservationAllocation);
  }

  void assertAllocationStateEqual(
      ReservationAllocationStateProto expected,
      ReservationAllocationStateProto actual) {

    assertEquals(
        expected.getAcceptanceTime(), actual.getAcceptanceTime());
    assertEquals(expected.getStartTime(), actual.getStartTime());
    assertEquals(expected.getEndTime(), actual.getEndTime());
    assertEquals(expected.getContainsGangs(), actual.getContainsGangs());
    assertEquals(expected.getUser(), actual.getUser());
    assertEquals(
        expected.getReservationDefinition(), actual.getReservationDefinition());
    assertEquals(expected.getAllocationRequestsList(),
        actual.getAllocationRequestsList());
  }

  void assertAllocationStateEqual(
      ReservationAllocation expected,
      ReservationAllocationStateProto actual) {
    assertEquals(
        expected.getAcceptanceTime(), actual.getAcceptanceTime());
    assertEquals(expected.getStartTime(), actual.getStartTime());
    assertEquals(expected.getEndTime(), actual.getEndTime());
    assertEquals(expected.containsGangs(), actual.getContainsGangs());
    assertEquals(expected.getUser(), actual.getUser());
    assertEquals(
        expected.getReservationDefinition(),
        ReservationSystemUtil.convertFromProtoFormat(
            actual.getReservationDefinition()));
    assertEquals(
        expected.getAllocationRequests(),
        ReservationSystemUtil.toAllocations(
            actual.getAllocationRequestsList()));
  }
}