TestNMLeveldbStateStoreService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.recovery;

import static org.assertj.core.api.Assertions.assertThat;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class TestNMLeveldbStateStoreService {
  private static final File TMP_DIR = new File(
      System.getProperty("test.build.data",
          System.getProperty("java.io.tmpdir")),
      TestNMLeveldbStateStoreService.class.getName());

  YarnConfiguration conf;
  NMLeveldbStateStoreService stateStore;

  @Before
  public void setup() throws IOException {
    FileUtil.fullyDelete(TMP_DIR);
    conf = new YarnConfiguration();
    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
    conf.set(YarnConfiguration.NM_RECOVERY_DIR, TMP_DIR.toString());
    restartStateStore();
  }

  @After
  public void cleanup() throws IOException {
    if (stateStore != null) {
      stateStore.close();
    }
    FileUtil.fullyDelete(TMP_DIR);
  }

  private List<RecoveredContainerState> loadContainersState(
      RecoveryIterator<RecoveredContainerState> it) throws IOException {
    List<RecoveredContainerState> containers =
        new ArrayList<RecoveredContainerState>();
    while (it.hasNext()) {
      RecoveredContainerState rcs = it.next();
      containers.add(rcs);
    }
    return containers;
  }

  private List<ContainerManagerApplicationProto> loadApplicationProtos(
      RecoveryIterator<ContainerManagerApplicationProto> it)
      throws IOException {
    List<ContainerManagerApplicationProto> applicationProtos =
        new ArrayList<ContainerManagerApplicationProto>();
    while (it.hasNext()) {
      applicationProtos.add(it.next());
    }
    return applicationProtos;
  }

  private List<DeletionServiceDeleteTaskProto> loadDeletionTaskProtos(
      RecoveryIterator<DeletionServiceDeleteTaskProto> it) throws IOException {
    List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
        new ArrayList<DeletionServiceDeleteTaskProto>();
    while (it.hasNext()) {
      deleteTaskProtos.add(it.next());
    }
    return deleteTaskProtos;
  }

  private Map<String, RecoveredUserResources> loadUserResources(
      RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it)
      throws IOException {
    Map<String, RecoveredUserResources> userResources =
        new HashMap<String, RecoveredUserResources>();
    while (it.hasNext()) {
      Map.Entry<String, RecoveredUserResources> entry = it.next();
      userResources.put(entry.getKey(), entry.getValue());
    }
    return userResources;
  }

  private Map<ApplicationAttemptId, MasterKey> loadNMTokens(
      RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it)
      throws IOException {
    Map<ApplicationAttemptId, MasterKey> nmTokens =
        new HashMap<ApplicationAttemptId, MasterKey>();
    while (it.hasNext()) {
      Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
      nmTokens.put(entry.getKey(), entry.getValue());
    }
    return nmTokens;
  }

  private Map<ContainerId, Long> loadContainerTokens(
      RecoveryIterator<Map.Entry<ContainerId, Long>> it) throws IOException {
    Map<ContainerId, Long> containerTokens =
        new HashMap<ContainerId, Long>();
    while (it.hasNext()) {
      Map.Entry<ContainerId, Long> entry = it.next();
      containerTokens.put(entry.getKey(), entry.getValue());
    }
    return containerTokens;
  }

  private List<LocalizedResourceProto> loadCompletedResources(
      RecoveryIterator<LocalizedResourceProto> it) throws IOException {
    List<LocalizedResourceProto> completedResources =
        new ArrayList<LocalizedResourceProto>();
    while (it != null && it.hasNext()) {
      completedResources.add(it.next());
    }
    return completedResources;
  }

  private Map<LocalResourceProto, Path> loadStartedResources(
      RecoveryIterator <Map.Entry<LocalResourceProto, Path>> it)
      throws IOException {
    Map<LocalResourceProto, Path> startedResources =
        new HashMap<LocalResourceProto, Path>();
    while (it != null &&it.hasNext()) {
      Map.Entry<LocalResourceProto, Path> entry = it.next();
      startedResources.put(entry.getKey(), entry.getValue());
    }
    return startedResources;
  }

  private void restartStateStore() throws IOException {
    // need to close so leveldb releases database lock
    if (stateStore != null) {
      stateStore.close();
    }
    stateStore = new NMLeveldbStateStoreService();
    stateStore.init(conf);
    stateStore.start();
  }

  private void verifyEmptyState() throws IOException {
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    assertNotNull(state);
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    assertNotNull(pubts);
    assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator())
        .isEmpty());
    assertTrue(loadStartedResources(pubts.getStartedResourcesIterator())
        .isEmpty());
    assertTrue(loadUserResources(state.getIterator()).isEmpty());
  }

  @Test
  public void testIsNewlyCreated() throws IOException {
    assertTrue(stateStore.isNewlyCreated());
    restartStateStore();
    assertFalse(stateStore.isNewlyCreated());
  }

  @Test
  public void testEmptyState() throws IOException {
    assertTrue(stateStore.canRecover());
    verifyEmptyState();
  }
  
  @Test
  public void testCheckVersion() throws IOException {
    // default version
    Version defaultVersion = stateStore.getCurrentVersion();
    Assert.assertEquals(defaultVersion, stateStore.loadVersion());

    // compatible version
    Version compatibleVersion =
        Version.newInstance(defaultVersion.getMajorVersion(),
          defaultVersion.getMinorVersion() + 2);
    stateStore.storeVersion(compatibleVersion);
    Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
    restartStateStore();
    // overwrite the compatible version
    Assert.assertEquals(defaultVersion, stateStore.loadVersion());

    // incompatible version
    Version incompatibleVersion =
      Version.newInstance(defaultVersion.getMajorVersion() + 1,
          defaultVersion.getMinorVersion());
    stateStore.storeVersion(incompatibleVersion);
    try {
      restartStateStore();
      Assert.fail("Incompatible version, should expect fail here.");
    } catch (ServiceStateException e) {
      Assert.assertTrue("Exception message mismatch",
        e.getMessage().contains("Incompatible version for NM state:"));
    }
  }

  @Test
  public void testApplicationStorage() throws IOException {
    // test empty when no state
    RecoveredApplicationsState state = stateStore.loadApplicationsState();
    List<ContainerManagerApplicationProto> apps =
        loadApplicationProtos(state.getIterator());
    assertTrue(apps.isEmpty());

    // store an application and verify recovered
    final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
    ContainerManagerApplicationProto.Builder builder =
        ContainerManagerApplicationProto.newBuilder();
    builder.setId(((ApplicationIdPBImpl) appId1).getProto());
    builder.setUser("user1");
    ContainerManagerApplicationProto appProto1 = builder.build();
    stateStore.storeApplication(appId1, appProto1);
    restartStateStore();
    state = stateStore.loadApplicationsState();
    apps = loadApplicationProtos(state.getIterator());
    assertEquals(1, apps.size());
    assertEquals(appProto1, apps.get(0));

    // add a new app
    final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
    builder = ContainerManagerApplicationProto.newBuilder();
    builder.setId(((ApplicationIdPBImpl) appId2).getProto());
    builder.setUser("user2");
    ContainerManagerApplicationProto appProto2 = builder.build();
    stateStore.storeApplication(appId2, appProto2);
    restartStateStore();
    state = stateStore.loadApplicationsState();
    apps = loadApplicationProtos(state.getIterator());
    assertEquals(2, apps.size());
    assertTrue(apps.contains(appProto1));
    assertTrue(apps.contains(appProto2));

    // test removing an application
    stateStore.removeApplication(appId2);
    restartStateStore();
    state = stateStore.loadApplicationsState();
    apps = loadApplicationProtos(state.getIterator());
    assertEquals(1, apps.size());
    assertEquals(appProto1, apps.get(0));
  }


  @Test
  public void testContainerStorageWhenContainerIsRequested()
      throws IOException {
    final ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    final RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(0, rcs.getVersion());
    assertEquals(containerParams.getContainerStartTime().longValue(),
        rcs.getStartTime());
    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
    assertEquals(false, rcs.getKilled());
    assertEquals(containerParams.getContainerRequest(), rcs.getStartRequest());
    assertTrue(rcs.getDiagnostics().isEmpty());
    assertEquals(containerParams.getContainerResource(), rcs.getCapability());
  }



  @Test
  public void testContainerStorageWhenContainerIsQueued()
      throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();
    StartContainerRequest containerReq = containerParams.getContainerRequest();
    Resource containerResource = containerParams.getContainerResource();
    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();

    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);

    stateStore.storeContainerQueued(containerId);
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
    assertEquals(false, rcs.getKilled());
    assertEquals(containerReq, rcs.getStartRequest());
    assertTrue(rcs.getDiagnostics().isEmpty());
    assertEquals(containerResource, rcs.getCapability());
  }

  @Test
  public void testContainerStorageWhenContainerIsLaunched()
      throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();
    StartContainerRequest containerReq = containerParams.getContainerRequest();
    Resource containerResource = containerParams.getContainerResource();
    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();

    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
    stateStore.storeContainerQueued(containerId);

    StringBuilder diags = launchContainerWithDiagnostics(containerId);
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
    assertEquals(false, rcs.getKilled());
    assertEquals(containerReq, rcs.getStartRequest());
    assertEquals(diags.toString(), rcs.getDiagnostics());
    assertEquals(containerResource, rcs.getCapability());
  }

  @Test
  public void testContainerStorageWhenContainerIsPaused()
      throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();
    StartContainerRequest containerReq = containerParams.getContainerRequest();
    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();

    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
    stateStore.storeContainerQueued(containerId);

    stateStore.storeContainerPaused(containerId);
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
    assertEquals(false, rcs.getKilled());
    assertEquals(containerReq, rcs.getStartRequest());

    // Resume the container
    stateStore.removeContainerPaused(containerId);
    restartStateStore();
    recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
  }

  @Test
  public void testContainerStorageWhenContainerSizeIncreased()
      throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();
    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();

    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
    stateStore.storeContainerQueued(containerId);
    launchContainerWithDiagnostics(containerId);

    increaseContainerSize(containerId);
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(0, rcs.getVersion());
    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
    assertEquals(false, rcs.getKilled());
    assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
  }

  @Test
  public void testContainerStorageWhenContainerMarkedAsKilled()
      throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();
    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();

    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
    stateStore.storeContainerQueued(containerId);
    StringBuilder diags = launchContainerWithDiagnostics(containerId);
    ContainerTokenIdentifier updateTokenIdentifier =
        increaseContainerSize(containerId);

    markContainerAsKilled(containerId, diags);
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
    assertTrue(rcs.getKilled());
    ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
        .newContainerTokenIdentifier(rcs.getStartRequest()
            .getContainerToken());
    assertEquals(updateTokenIdentifier, tokenReadFromRequest);
    assertEquals(diags.toString(), rcs.getDiagnostics());
  }

  @Test
  public void testContainerStorageWhenContainerCompleted()
      throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();
    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();

    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
    stateStore.storeContainerQueued(containerId);
    StringBuilder diags = launchContainerWithDiagnostics(containerId);
    markContainerAsKilled(containerId, diags);

    // add yet more diags, mark container completed
    diags.append("some final diags");
    stateStore.storeContainerDiagnostics(containerId, diags);
    stateStore.storeContainerCompleted(containerId, 21);
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
    assertEquals(21, rcs.getExitCode());
    assertTrue(rcs.getKilled());
    assertEquals(diags.toString(), rcs.getDiagnostics());
  }

  @Test
  public void testContainerStorage() throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();

    // remaining retry attempts, work dir and log dir are stored
    stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
    stateStore.storeContainerWorkDir(containerId, "/test/workdir");
    stateStore.storeContainerLogDir(containerId, "/test/logdir");
    restartStateStore();

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(6, rcs.getRemainingRetryAttempts());
    assertEquals("/test/workdir", rcs.getWorkDir());
    assertEquals("/test/logdir", rcs.getLogDir());
    validateRetryAttempts(containerId);
  }

  @Test
  public void testContainerStorageWhenContainerRemoved()
      throws IOException {
    ContainerStateConstructParams containerParams =
        storeContainerInStateStore();
    ContainerId containerId = containerParams.getContainerId();

    // remove the container and verify not recovered
    stateStore.removeContainer(containerId);
    restartStateStore();
    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertTrue(recoveredContainers.isEmpty());
    // recover again to check remove clears all containers
    restartStateStore();
    NMStateStoreService nmStoreSpy = spy(stateStore);
    loadContainersState(nmStoreSpy.getContainerStateIterator());
    verify(nmStoreSpy, times(0)).removeContainer(any(ContainerId.class));
  }

  private ContainerStateConstructParams storeContainerInStateStore()
      throws IOException {
    // test empty when no state
    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertTrue(recoveredContainers.isEmpty());

    // create a container request
    ApplicationId appId = ApplicationId.newInstance(1234, 3);
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(appId, 4);
    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
    Resource containerResource = Resource.newInstance(1024, 2);
    StartContainerRequest containerReq =
        createContainerRequest(containerId, containerResource);

    long anyContainerStartTime = 1573155078494L;
    stateStore.storeContainer(containerId, 0, anyContainerStartTime,
        containerReq);

    // verify the container version key is not stored for new containers
    DB db = stateStore.getDB();
    assertNull("version key present for new container", db.get(bytes(
        stateStore.getContainerVersionKey(containerId.toString()))));

    return new ContainerStateConstructParams()
        .setContainerRequest(containerReq)
        .setContainerResource(containerResource)
        .setContainerStartTime(anyContainerStartTime)
        .setAppAttemptId(appAttemptId)
        .setContainerId(containerId);
  }

  private static class ContainerStateConstructParams {
    private StartContainerRequest containerRequest;
    private Resource containerResource;
    private Long containerStartTime;
    private ApplicationAttemptId appAttemptId;
    private ContainerId containerId;

    public ApplicationAttemptId getAppAttemptId() {
      return appAttemptId;
    }
    public ContainerStateConstructParams setAppAttemptId(ApplicationAttemptId
        theAppAttemptId) {
      this.appAttemptId = theAppAttemptId;
      return this;
    }
    public ContainerId getContainerId() {
      return containerId;
    }
    public ContainerStateConstructParams setContainerId(ContainerId
        theContainerId) {
      this.containerId = theContainerId;
      return this;
    }

    public StartContainerRequest getContainerRequest() {
      return containerRequest;
    }
    public ContainerStateConstructParams setContainerRequest(
        StartContainerRequest theContainerRequest) {
      this.containerRequest = theContainerRequest;
      return this;
    }

    public Resource getContainerResource() {
      return containerResource;
    }

    public ContainerStateConstructParams setContainerResource(
        Resource theContainerResource) {
      this.containerResource = theContainerResource;
      return this;
    }

    public Long getContainerStartTime() {
      return containerStartTime;
    }

    public ContainerStateConstructParams setContainerStartTime(
        Long theContainerStartTime) {
      this.containerStartTime = theContainerStartTime;
      return this;
    }
  }

  private void markContainerAsKilled(ContainerId containerId,
      StringBuilder diags) throws IOException {
    // mark the container killed, add some more diags
    diags.append("some more diags for container");
    stateStore.storeContainerDiagnostics(containerId, diags);
    stateStore.storeContainerKilled(containerId);
  }

  private ContainerTokenIdentifier increaseContainerSize(
      ContainerId containerId) throws IOException {
    ContainerTokenIdentifier updateTokenIdentifier =
        new ContainerTokenIdentifier(containerId, "host", "user",
            Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
            Priority.newInstance(7), 13579);
    stateStore
        .storeContainerUpdateToken(containerId, updateTokenIdentifier);
    return updateTokenIdentifier;
  }

  private StringBuilder launchContainerWithDiagnostics(ContainerId containerId)
      throws IOException {
    StringBuilder diags = new StringBuilder();
    stateStore.storeContainerLaunched(containerId);
    diags.append("some diags for container");
    stateStore.storeContainerDiagnostics(containerId, diags);
    return diags;
  }

  private void storeNewContainerRecordWithoutStartContainerRequest(
      ApplicationAttemptId appAttemptId) throws IOException {
    // store a new container record without StartContainerRequest
    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
    stateStore.storeContainerLaunched(containerId1);

    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    // check whether the new container record is discarded
    assertEquals(1, recoveredContainers.size());
  }

  private void validateRetryAttempts(ContainerId containerId)
      throws IOException {
    // store finishTimeForRetryAttempts
    List<Long> finishTimeForRetryAttempts = Arrays.asList(1462700529039L,
        1462700529050L, 1462700529120L);
    stateStore.storeContainerRestartTimes(containerId,
        finishTimeForRetryAttempts);
    restartStateStore();
    RecoveredContainerState rcs =
        loadContainersState(stateStore.getContainerStateIterator()).get(0);
    List<Long> recoveredRestartTimes = rcs.getRestartTimes();
    assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
    assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
    assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
  }

  private StartContainerRequest createContainerRequest(
          ContainerId containerId, Resource res) {
    return createContainerRequestInternal(containerId, res);
  }

  private StartContainerRequest createContainerRequestInternal(ContainerId
          containerId, Resource res) {
    LocalResource lrsrc = LocalResource.newInstance(
        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
        1234567890L);
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put("rsrc", lrsrc);
    Map<String, String> env = new HashMap<String, String>();
    env.put("somevar", "someval");
    List<String> containerCmds = new ArrayList<String>();
    containerCmds.add("somecmd");
    containerCmds.add("somearg");
    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
    serviceData.put("someservice",
        ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
    ByteBuffer containerTokens =
        ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
    Map<ApplicationAccessType, String> acls =
        new HashMap<ApplicationAccessType, String>();
    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
        localResources, env, containerCmds, serviceData, containerTokens,
        acls);
    Resource containerRsrc = Resource.newInstance(1357, 3);

    if (res != null) {
      containerRsrc = res;
    }
    ContainerTokenIdentifier containerTokenId =
        new ContainerTokenIdentifier(containerId, "host", "user",
            containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
            13579);
    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
        "tokenservice");
    return StartContainerRequest.newInstance(clc, containerToken);
  }

  @Test
  public void testLocalTrackerStateIterator() throws IOException {
    String user1 = "somebody";
    ApplicationId appId1 = ApplicationId.newInstance(1, 1);
    ApplicationId appId2 = ApplicationId.newInstance(2, 2);

    String user2 = "someone";
    ApplicationId appId3 = ApplicationId.newInstance(3, 3);

    // start and finish local resource for applications
    Path appRsrcPath1 = new Path("hdfs://some/app/resource1");
    LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl)
        LocalResource.newInstance(
            URL.fromPath(appRsrcPath1),
            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
            123L, 456L);
    LocalResourceProto appRsrcProto1 = rsrcPb1.getProto();
    Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1");
    Path appRsrcPath2 = new Path("hdfs://some/app/resource2");
    LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl)
        LocalResource.newInstance(
            URL.fromPath(appRsrcPath2),
            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
            123L, 456L);
    LocalResourceProto appRsrcProto2 = rsrcPb2.getProto();
    Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2");
    Path appRsrcPath3 = new Path("hdfs://some/app/resource3");
    LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl)
        LocalResource.newInstance(
            URL.fromPath(appRsrcPath3),
            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
            123L, 456L);
    LocalResourceProto appRsrcProto3 = rsrcPb3.getProto();
    Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2");

    stateStore.startResourceLocalization(user1, appId1, appRsrcProto1,
        appRsrcLocalPath1);
    stateStore.startResourceLocalization(user1, appId2, appRsrcProto2,
        appRsrcLocalPath2);
    stateStore.startResourceLocalization(user2, appId3, appRsrcProto3,
        appRsrcLocalPath3);

    LocalizedResourceProto appLocalizedProto1 =
        LocalizedResourceProto.newBuilder()
            .setResource(appRsrcProto1)
            .setLocalPath(appRsrcLocalPath1.toString())
            .setSize(1234567L)
            .build();
    LocalizedResourceProto appLocalizedProto2 =
        LocalizedResourceProto.newBuilder()
            .setResource(appRsrcProto2)
            .setLocalPath(appRsrcLocalPath2.toString())
            .setSize(1234567L)
            .build();
    LocalizedResourceProto appLocalizedProto3 =
        LocalizedResourceProto.newBuilder()
            .setResource(appRsrcProto3)
            .setLocalPath(appRsrcLocalPath3.toString())
            .setSize(1234567L)
            .build();


    stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1);
    stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2);
    stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3);


    List<LocalizedResourceProto> completedResources =
        new ArrayList<LocalizedResourceProto>();
    Map<LocalResourceProto, Path> startedResources =
        new HashMap<LocalResourceProto, Path>();

    // restart and verify two users exist and two apps completed for user1.
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertEquals(2, userResources.size());

    RecoveredUserResources uResource = userResources.get(user1);
    assertEquals(2, uResource.getAppTrackerStates().size());
    LocalResourceTrackerState app1ts =
        uResource.getAppTrackerStates().get(appId1);
    assertNotNull(app1ts);
    completedResources = loadCompletedResources(
        app1ts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        app1ts.getStartedResourcesIterator());
    assertTrue(startedResources.isEmpty());
    assertEquals(1, completedResources.size());
    assertEquals(appLocalizedProto1,
        completedResources.iterator().next());
    LocalResourceTrackerState app2ts =
        uResource.getAppTrackerStates().get(appId2);
    assertNotNull(app2ts);
    completedResources = loadCompletedResources(
        app2ts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        app2ts.getStartedResourcesIterator());
    assertTrue(startedResources.isEmpty());
    assertEquals(1, completedResources.size());
    assertEquals(appLocalizedProto2,
        completedResources.iterator().next());
  }

  @Test
  public void testStartResourceLocalizationForApplicationResource()
      throws IOException {
    String user = "somebody";
    ApplicationId appId = ApplicationId.newInstance(1, 1);

    // start a local resource for an application
    Path appRsrcPath = new Path("hdfs://some/app/resource");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
        LocalResource.newInstance(
            URL.fromPath(appRsrcPath),
            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
            123L, 456L);
    LocalResourceProto appRsrcProto = rsrcPb.getProto();
    Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
    stateStore.startResourceLocalization(user, appId, appRsrcProto,
        appRsrcLocalPath);

    List<LocalizedResourceProto> completedResources =
        new ArrayList<LocalizedResourceProto>();
    Map<LocalResourceProto, Path> startedResources =
        new HashMap<LocalResourceProto, Path>();

    // restart and verify only app resource is marked in-progress
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    completedResources = loadCompletedResources(
        pubts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        pubts.getStartedResourcesIterator());
    assertTrue(completedResources.isEmpty());
    assertTrue(startedResources.isEmpty());
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertEquals(1, userResources.size());
    RecoveredUserResources rur = userResources.get(user);
    LocalResourceTrackerState privts = rur.getPrivateTrackerState();
    assertNotNull(privts);
    completedResources = loadCompletedResources(
        privts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        privts.getStartedResourcesIterator());
    assertTrue(completedResources.isEmpty());
    assertTrue(startedResources.isEmpty());
    assertEquals(1, rur.getAppTrackerStates().size());
    LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
    assertNotNull(appts);
    completedResources = loadCompletedResources(
        appts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        appts.getStartedResourcesIterator());
    assertTrue(completedResources.isEmpty());
    assertEquals(1, startedResources.size());
    assertEquals(appRsrcLocalPath,
        startedResources.get(appRsrcProto));
  }

  @Test
  public void testStartResourceLocalizationForPublicResources()
      throws IOException {
    Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
        .newInstance(
            URL.fromPath(pubRsrcPath1),
            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
            789L, 135L);
    LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
    Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
    stateStore.startResourceLocalization(null, null, pubRsrcProto1,
        pubRsrcLocalPath1);
    Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
            URL.fromPath(pubRsrcPath2),
            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
            789L, 135L);
    LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
    Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
    stateStore.startResourceLocalization(null, null, pubRsrcProto2,
        pubRsrcLocalPath2);

    // restart and verify resources are marked in-progress
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    List<LocalizedResourceProto> completedResources = loadCompletedResources(
        pubts.getCompletedResourcesIterator());
    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
        pubts.getStartedResourcesIterator());
    assertTrue(completedResources.isEmpty());
    assertEquals(2, startedResources.size());
    assertEquals(pubRsrcLocalPath1,
        startedResources.get(pubRsrcProto1));
    assertEquals(pubRsrcLocalPath2,
        startedResources.get(pubRsrcProto2));
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertEquals(0, userResources.size());
  }

  @Test
  public void testStartResourceLocalizationForPrivateResource()
      throws IOException {
    Path privRsrcPath = new Path("hdfs://some/private/resource");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
        .newInstance(
            URL.fromPath(privRsrcPath),
            LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
            789L, 680L, "*pattern*");
    LocalResourceProto privRsrcProto = rsrcPb.getProto();
    Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
    String user = "somebody";
    stateStore.startResourceLocalization(user, null, privRsrcProto,
        privRsrcLocalPath);

    // restart and verify resources are marked in-progress
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertEquals(1, userResources.size());
    RecoveredUserResources rur = userResources.get(user);
    LocalResourceTrackerState privts = rur.getPrivateTrackerState();
    assertNotNull(privts);
    List<LocalizedResourceProto> completedResources = loadCompletedResources(
        privts.getCompletedResourcesIterator());
    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
        privts.getStartedResourcesIterator());
    assertTrue(completedResources.isEmpty());
    assertEquals(1, startedResources.size());
    assertEquals(privRsrcLocalPath,
        startedResources.get(privRsrcProto));
    assertEquals(0, rur.getAppTrackerStates().size());
  }

  @Test
  public void testFinishResourceLocalizationForApplicationResource()
      throws IOException {
    String user = "somebody";
    ApplicationId appId = ApplicationId.newInstance(1, 1);

    // start and finish a local resource for an application
    Path appRsrcPath = new Path("hdfs://some/app/resource");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
        LocalResource.newInstance(
            URL.fromPath(appRsrcPath),
            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
            123L, 456L);
    LocalResourceProto appRsrcProto = rsrcPb.getProto();
    Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
    stateStore.startResourceLocalization(user, appId, appRsrcProto,
        appRsrcLocalPath);
    LocalizedResourceProto appLocalizedProto =
        LocalizedResourceProto.newBuilder()
          .setResource(appRsrcProto)
          .setLocalPath(appRsrcLocalPath.toString())
          .setSize(1234567L)
          .build();
    stateStore.finishResourceLocalization(user, appId, appLocalizedProto);

    List<LocalizedResourceProto> completedResources =
        new ArrayList<LocalizedResourceProto>();
    Map<LocalResourceProto, Path> startedResources =
        new HashMap<LocalResourceProto, Path>();

    // restart and verify only app resource is completed
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    completedResources = loadCompletedResources(
        pubts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        pubts.getStartedResourcesIterator());
    assertTrue(completedResources.isEmpty());
    assertTrue(startedResources.isEmpty());
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertEquals(1, userResources.size());
    RecoveredUserResources rur = userResources.get(user);
    LocalResourceTrackerState privts = rur.getPrivateTrackerState();
    assertNotNull(privts);
    completedResources = loadCompletedResources(
        privts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        privts.getStartedResourcesIterator());
    assertTrue(completedResources.isEmpty());
    assertTrue(startedResources.isEmpty());
    assertEquals(1, rur.getAppTrackerStates().size());
    LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
    assertNotNull(appts);
    completedResources = loadCompletedResources(
        appts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        appts.getStartedResourcesIterator());
    assertTrue(startedResources.isEmpty());
    assertEquals(1, completedResources.size());
    assertEquals(appLocalizedProto,
        completedResources.iterator().next());
  }

  @Test
  public void testFinishResourceLocalizationForPublicResources()
      throws IOException {
    Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
        .newInstance(
            URL.fromPath(pubRsrcPath1),
            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
            789L, 135L);
    LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
    Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
    stateStore.startResourceLocalization(null, null, pubRsrcProto1,
        pubRsrcLocalPath1);
    Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
            URL.fromPath(pubRsrcPath2),
            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
            789L, 135L);
    LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
    Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
    stateStore.startResourceLocalization(null, null, pubRsrcProto2,
        pubRsrcLocalPath2);

    // finish some of the resources
    LocalizedResourceProto pubLocalizedProto1 =
        LocalizedResourceProto.newBuilder()
          .setResource(pubRsrcProto1)
          .setLocalPath(pubRsrcLocalPath1.toString())
          .setSize(pubRsrcProto1.getSize())
          .build();
    stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);

    // restart and verify state
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    List<LocalizedResourceProto> completedResources = loadCompletedResources(
        pubts.getCompletedResourcesIterator());
    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
        pubts.getStartedResourcesIterator());
    assertEquals(1, completedResources.size());
    assertEquals(pubLocalizedProto1,
        completedResources.iterator().next());
    assertEquals(1, startedResources.size());
    assertEquals(pubRsrcLocalPath2,
        startedResources.get(pubRsrcProto2));
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertEquals(0, userResources.size());
  }

  @Test
  public void testFinishResourceLocalizationForPrivateResource()
      throws IOException {
    String user = "somebody";
    ApplicationId appId = ApplicationId.newInstance(1, 1);

    Path privRsrcPath = new Path("hdfs://some/private/resource");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
        .newInstance(
            URL.fromPath(privRsrcPath),
            LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
            789L, 680L, "*pattern*");
    LocalResourceProto privRsrcProto = rsrcPb.getProto();
    Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
    stateStore.startResourceLocalization(user, null, privRsrcProto,
        privRsrcLocalPath);

    LocalizedResourceProto privLocalizedProto =
        LocalizedResourceProto.newBuilder()
          .setResource(privRsrcProto)
          .setLocalPath(privRsrcLocalPath.toString())
          .setSize(privRsrcProto.getSize())
          .build();
    stateStore.finishResourceLocalization(user, null, privLocalizedProto);

    // restart and verify state
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    List<LocalizedResourceProto> completedResources = loadCompletedResources(
        pubts.getCompletedResourcesIterator());
    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
        pubts.getStartedResourcesIterator());
    assertEquals(0, completedResources.size());
    assertEquals(0, startedResources.size());
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertEquals(1, userResources.size());
    RecoveredUserResources rur = userResources.get(user);
    LocalResourceTrackerState privts = rur.getPrivateTrackerState();
    assertNotNull(privts);
    completedResources = loadCompletedResources(
        privts.getCompletedResourcesIterator());
    startedResources = loadStartedResources(
        privts.getStartedResourcesIterator());
    assertEquals(1, completedResources.size());
    assertEquals(privLocalizedProto,
        completedResources.iterator().next());
    assertTrue(startedResources.isEmpty());
    assertEquals(0, rur.getAppTrackerStates().size());
    LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
    assertNull(appts);
    assertTrue(startedResources.isEmpty());
    assertEquals(1, completedResources.size());
  }

  @Test
  public void testRemoveLocalizedResourceForApplicationResource()
      throws IOException {
    String user = "somebody";
    ApplicationId appId = ApplicationId.newInstance(1, 1);

    // go through the complete lifecycle for an application local resource
    Path appRsrcPath = new Path("hdfs://some/app/resource");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
        LocalResource.newInstance(
            URL.fromPath(appRsrcPath),
            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
            123L, 456L);
    LocalResourceProto appRsrcProto = rsrcPb.getProto();
    Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
    stateStore.startResourceLocalization(user, appId, appRsrcProto,
        appRsrcLocalPath);
    LocalizedResourceProto appLocalizedProto =
        LocalizedResourceProto.newBuilder()
          .setResource(appRsrcProto)
          .setLocalPath(appRsrcLocalPath.toString())
          .setSize(1234567L)
          .build();
    stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
    stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);

    restartStateStore();
    verifyEmptyState();

    // remove an app resource that didn't finish
    stateStore.startResourceLocalization(user, appId, appRsrcProto,
        appRsrcLocalPath);
    stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);

    restartStateStore();
    verifyEmptyState();
  }

  @Test
  public void testRemoveLocalizedResourceForPublicResources()
      throws IOException {
    // add public resources and remove some
    Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
        .newInstance(
            URL.fromPath(pubRsrcPath1),
            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
            789L, 135L);
    LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
    Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
    stateStore.startResourceLocalization(null, null, pubRsrcProto1,
        pubRsrcLocalPath1);
    LocalizedResourceProto pubLocalizedProto1 =
        LocalizedResourceProto.newBuilder()
          .setResource(pubRsrcProto1)
          .setLocalPath(pubRsrcLocalPath1.toString())
          .setSize(789L)
          .build();
    stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
    Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
            URL.fromPath(pubRsrcPath2),
            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
            789L, 135L);
    LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
    Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
    stateStore.startResourceLocalization(null, null, pubRsrcProto2,
        pubRsrcLocalPath2);
    LocalizedResourceProto pubLocalizedProto2 =
        LocalizedResourceProto.newBuilder()
          .setResource(pubRsrcProto2)
          .setLocalPath(pubRsrcLocalPath2.toString())
          .setSize(7654321L)
          .build();
    stateStore.finishResourceLocalization(null, null, pubLocalizedProto2);
    stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2);

    // restart and verify state
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    List<LocalizedResourceProto> completedResources =
        loadCompletedResources(pubts.getCompletedResourcesIterator());
    Map<LocalResourceProto, Path> startedResources =
        loadStartedResources(pubts.getStartedResourcesIterator());
    assertTrue(startedResources.isEmpty());
    assertEquals(1, completedResources.size());
    assertEquals(pubLocalizedProto1,
        completedResources.iterator().next());
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertTrue(userResources.isEmpty());
  }

  @Test
  public void testRemoveLocalizedResourceForPrivateResource()
      throws IOException {
    String user = "somebody";

    Path privRsrcPath = new Path("hdfs://some/private/resource");
    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
        .newInstance(
            URL.fromPath(privRsrcPath),
            LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
            789L, 680L, "*pattern*");
    LocalResourceProto privRsrcProto = rsrcPb.getProto();
    Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
    stateStore.startResourceLocalization(user, null, privRsrcProto,
        privRsrcLocalPath);
    stateStore.removeLocalizedResource(user, null, privRsrcLocalPath);

    // restart and verify state
    restartStateStore();
    RecoveredLocalizationState state = stateStore.loadLocalizationState();
    LocalResourceTrackerState pubts = state.getPublicTrackerState();
    List<LocalizedResourceProto> completedResources =
        loadCompletedResources(pubts.getCompletedResourcesIterator());
    Map<LocalResourceProto, Path> startedResources =
        loadStartedResources(pubts.getStartedResourcesIterator());
    assertTrue(startedResources.isEmpty());
    assertEquals(0, completedResources.size());
    Map<String, RecoveredUserResources> userResources =
        loadUserResources(state.getIterator());
    assertTrue(userResources.isEmpty());
  }

  @Test
  public void testDeletionTaskStorage() throws IOException {
    // test empty when no state
    RecoveredDeletionServiceState state =
        stateStore.loadDeletionServiceState();
    List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
        loadDeletionTaskProtos(state.getIterator());
    assertTrue(deleteTaskProtos.isEmpty());

    // store a deletion task and verify recovered
    DeletionServiceDeleteTaskProto proto =
        DeletionServiceDeleteTaskProto.newBuilder()
        .setId(7)
        .setUser("someuser")
        .setSubdir("some/subdir")
        .addBasedirs("some/dir/path")
        .addBasedirs("some/other/dir/path")
        .setDeletionTime(123456L)
        .addSuccessorIds(8)
        .addSuccessorIds(9)
        .build();
    stateStore.storeDeletionTask(proto.getId(), proto);
    restartStateStore();
    state = stateStore.loadDeletionServiceState();
    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
    assertEquals(1, deleteTaskProtos.size());
    assertEquals(proto, deleteTaskProtos.get(0));

    // store another deletion task
    DeletionServiceDeleteTaskProto proto2 =
        DeletionServiceDeleteTaskProto.newBuilder()
        .setId(8)
        .setUser("user2")
        .setSubdir("subdir2")
        .setDeletionTime(789L)
        .build();
    stateStore.storeDeletionTask(proto2.getId(), proto2);
    restartStateStore();
    state = stateStore.loadDeletionServiceState();
    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
    assertEquals(2, deleteTaskProtos.size());
    assertTrue(deleteTaskProtos.contains(proto));
    assertTrue(deleteTaskProtos.contains(proto2));


    // delete a task and verify gone after recovery
    stateStore.removeDeletionTask(proto2.getId());
    restartStateStore();
    state =  stateStore.loadDeletionServiceState();
    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
    assertEquals(1, deleteTaskProtos.size());
    assertEquals(proto, deleteTaskProtos.get(0));

    // delete the last task and verify none left
    stateStore.removeDeletionTask(proto.getId());
    restartStateStore();
    state = stateStore.loadDeletionServiceState();
    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
    assertTrue(deleteTaskProtos.isEmpty());  }

  @Test
  public void testNMTokenStorage() throws IOException {
    // test empty when no state
    RecoveredNMTokensState state = stateStore.loadNMTokensState();
    Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
        loadNMTokens(state.getIterator());
    assertNull(state.getCurrentMasterKey());
    assertNull(state.getPreviousMasterKey());
    assertTrue(loadedAppKeys.isEmpty());

    // store a master key and verify recovered
    NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
    MasterKey currentKey = secretMgr.generateKey();
    stateStore.storeNMTokenCurrentMasterKey(currentKey);
    restartStateStore();
    state = stateStore.loadNMTokensState();
    loadedAppKeys = loadNMTokens(state.getIterator());
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertNull(state.getPreviousMasterKey());
    assertTrue(loadedAppKeys.isEmpty());

    // store a previous key and verify recovered
    MasterKey prevKey = secretMgr.generateKey();
    stateStore.storeNMTokenPreviousMasterKey(prevKey);
    restartStateStore();
    state = stateStore.loadNMTokensState();
    loadedAppKeys = loadNMTokens(state.getIterator());
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertEquals(prevKey, state.getPreviousMasterKey());
    assertTrue(loadedAppKeys.isEmpty());

    // store a few application keys and verify recovered
    ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
        ApplicationId.newInstance(1, 1), 1);
    MasterKey attemptKey1 = secretMgr.generateKey();
    stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1);
    ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(
        ApplicationId.newInstance(2, 3), 4);
    MasterKey attemptKey2 = secretMgr.generateKey();
    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
    restartStateStore();
    state = stateStore.loadNMTokensState();
    loadedAppKeys = loadNMTokens(state.getIterator());
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertEquals(prevKey, state.getPreviousMasterKey());
    assertEquals(2, loadedAppKeys.size());
    assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));

    // add/update/remove keys and verify recovered
    ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance(
        ApplicationId.newInstance(5, 6), 7);
    MasterKey attemptKey3 = secretMgr.generateKey();
    stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3);
    stateStore.removeNMTokenApplicationMasterKey(attempt1);
    attemptKey2 = prevKey;
    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
    prevKey = currentKey;
    stateStore.storeNMTokenPreviousMasterKey(prevKey);
    currentKey = secretMgr.generateKey();
    stateStore.storeNMTokenCurrentMasterKey(currentKey);
    restartStateStore();
    state = stateStore.loadNMTokensState();
    loadedAppKeys = loadNMTokens(state.getIterator());
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertEquals(prevKey, state.getPreviousMasterKey());
    assertEquals(2, loadedAppKeys.size());
    assertNull(loadedAppKeys.get(attempt1));
    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
    assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
  }

  @Test
  public void testContainerTokenStorage() throws IOException {
    // test empty when no state
    RecoveredContainerTokensState state =
        stateStore.loadContainerTokensState();
    Map<ContainerId, Long> loadedActiveTokens = loadContainerTokens(state.it);
    assertNull(state.getCurrentMasterKey());
    assertNull(state.getPreviousMasterKey());
    assertTrue(loadedActiveTokens.isEmpty());

    // store a master key and verify recovered
    ContainerTokenKeyGeneratorForTest keygen =
        new ContainerTokenKeyGeneratorForTest(new YarnConfiguration());
    MasterKey currentKey = keygen.generateKey();
    stateStore.storeContainerTokenCurrentMasterKey(currentKey);
    restartStateStore();
    state = stateStore.loadContainerTokensState();
    loadedActiveTokens = loadContainerTokens(state.it);
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertNull(state.getPreviousMasterKey());
    assertTrue(loadedActiveTokens.isEmpty());

    // store a previous key and verify recovered
    MasterKey prevKey = keygen.generateKey();
    stateStore.storeContainerTokenPreviousMasterKey(prevKey);
    restartStateStore();
    state = stateStore.loadContainerTokensState();
    loadedActiveTokens = loadContainerTokens(state.it);
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertEquals(prevKey, state.getPreviousMasterKey());
    assertTrue(loadedActiveTokens.isEmpty());

    // store a few container tokens and verify recovered
    ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
    Long expTime1 = 1234567890L;
    ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
    Long expTime2 = 9876543210L;
    stateStore.storeContainerToken(cid1, expTime1);
    stateStore.storeContainerToken(cid2, expTime2);
    restartStateStore();
    state = stateStore.loadContainerTokensState();
    loadedActiveTokens = loadContainerTokens(state.it);
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertEquals(prevKey, state.getPreviousMasterKey());
    assertEquals(2, loadedActiveTokens.size());
    assertEquals(expTime1, loadedActiveTokens.get(cid1));
    assertEquals(expTime2, loadedActiveTokens.get(cid2));

    // add/update/remove tokens and verify recovered
    ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3);
    Long expTime3 = 135798642L;
    stateStore.storeContainerToken(cid3, expTime3);
    stateStore.removeContainerToken(cid1);
    expTime2 += 246897531L;
    stateStore.storeContainerToken(cid2, expTime2);
    prevKey = currentKey;
    stateStore.storeContainerTokenPreviousMasterKey(prevKey);
    currentKey = keygen.generateKey();
    stateStore.storeContainerTokenCurrentMasterKey(currentKey);
    restartStateStore();
    state = stateStore.loadContainerTokensState();
    loadedActiveTokens = loadContainerTokens(state.it);
    assertEquals(currentKey, state.getCurrentMasterKey());
    assertEquals(prevKey, state.getPreviousMasterKey());
    assertEquals(2, loadedActiveTokens.size());
    assertNull(loadedActiveTokens.get(cid1));
    assertEquals(expTime2, loadedActiveTokens.get(cid2));
    assertEquals(expTime3, loadedActiveTokens.get(cid3));
  }

  @Test
  public void testLogDeleterStorage() throws IOException {
    // test empty when no state
    RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
    assertTrue(state.getLogDeleterMap().isEmpty());

    // store log deleter state
    final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
    LogDeleterProto proto1 = LogDeleterProto.newBuilder()
        .setUser("user1")
        .setDeletionTime(1234)
        .build();
    stateStore.storeLogDeleter(appId1, proto1);

    // restart state store and verify recovered
    restartStateStore();
    state = stateStore.loadLogDeleterState();
    assertEquals(1, state.getLogDeleterMap().size());
    assertEquals(proto1, state.getLogDeleterMap().get(appId1));

    // store another log deleter
    final ApplicationId appId2 = ApplicationId.newInstance(2, 2);
    LogDeleterProto proto2 = LogDeleterProto.newBuilder()
        .setUser("user2")
        .setDeletionTime(5678)
        .build();
    stateStore.storeLogDeleter(appId2, proto2);

    // restart state store and verify recovered
    restartStateStore();
    state = stateStore.loadLogDeleterState();
    assertEquals(2, state.getLogDeleterMap().size());
    assertEquals(proto1, state.getLogDeleterMap().get(appId1));
    assertEquals(proto2, state.getLogDeleterMap().get(appId2));

    // remove a deleter and verify removed after restart and recovery
    stateStore.removeLogDeleter(appId1);
    restartStateStore();
    state = stateStore.loadLogDeleterState();
    assertEquals(1, state.getLogDeleterMap().size());
    assertEquals(proto2, state.getLogDeleterMap().get(appId2));

    // remove last deleter and verify empty after restart and recovery
    stateStore.removeLogDeleter(appId2);
    restartStateStore();
    state = stateStore.loadLogDeleterState();
    assertTrue(state.getLogDeleterMap().isEmpty());
  }

  @Test
  public void testCompactionCycle() throws IOException {
    final DB mockdb = mock(DB.class);
    conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
    NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
      @Override
      protected void checkVersion() {}

      @Override
      protected DB openDatabase(Configuration conf) {
        return mockdb;
      }
    };
    store.init(conf);
    store.start();
    verify(mockdb, timeout(10000).atLeastOnce()).compactRange(
        (byte[]) isNull(), (byte[]) isNull());
    store.close();
  }

  @Test
  public void testUnexpectedKeyDoesntThrowException() throws IOException {
    // test empty when no state
    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertTrue(recoveredContainers.isEmpty());

    ApplicationId appId = ApplicationId.newInstance(1234, 3);
    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
        4);
    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
    StartContainerRequest startContainerRequest = storeMockContainer(
        containerId);

    // add a invalid key
    byte[] invalidKey = ("ContainerManager/containers/"
    + containerId.toString() + "/invalidKey1234").getBytes();
    stateStore.getDB().put(invalidKey, new byte[1]);
    restartStateStore();
    recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
    assertEquals(false, rcs.getKilled());
    assertEquals(startContainerRequest, rcs.getStartRequest());
    assertTrue(rcs.getDiagnostics().isEmpty());
    assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
    // assert unknown keys are cleaned up finally
    assertNotNull(stateStore.getDB().get(invalidKey));
    stateStore.removeContainer(containerId);
    assertNull(stateStore.getDB().get(invalidKey));
  }

  @Test
  public void testAMRMProxyStorage() throws IOException {
    RecoveredAMRMProxyState state = stateStore.loadAMRMProxyState();
    assertThat(state.getCurrentMasterKey()).isNull();
    assertThat(state.getNextMasterKey()).isNull();
    assertThat(state.getAppContexts()).isEmpty();

    ApplicationId appId1 = ApplicationId.newInstance(1, 1);
    ApplicationId appId2 = ApplicationId.newInstance(1, 2);
    ApplicationAttemptId attemptId1 =
        ApplicationAttemptId.newInstance(appId1, 1);
    ApplicationAttemptId attemptId2 =
        ApplicationAttemptId.newInstance(appId2, 2);
    String key1 = "key1";
    String key2 = "key2";
    byte[] data1 = "data1".getBytes();
    byte[] data2 = "data2".getBytes();

    AMRMProxyTokenSecretManager secretManager =
        new AMRMProxyTokenSecretManager(stateStore);
    secretManager.init(conf);
    // Generate currentMasterKey
    secretManager.start();

    try {
      // Add two applications, each with two data entries
      stateStore.storeAMRMProxyAppContextEntry(attemptId1, key1, data1);
      stateStore.storeAMRMProxyAppContextEntry(attemptId2, key1, data1);
      stateStore.storeAMRMProxyAppContextEntry(attemptId1, key2, data2);
      stateStore.storeAMRMProxyAppContextEntry(attemptId2, key2, data2);

      // restart state store and verify recovered
      restartStateStore();
      secretManager.setNMStateStoreService(stateStore);
      state = stateStore.loadAMRMProxyState();
      assertEquals(state.getCurrentMasterKey(),
          secretManager.getCurrentMasterKeyData().getMasterKey());
      assertThat(state.getNextMasterKey()).isNull();
      assertThat(state.getAppContexts()).hasSize(2);
      // app1
      Map<String, byte[]> map = state.getAppContexts().get(attemptId1);
      assertNotEquals(map, null);
      assertThat(map).hasSize(2);
      assertTrue(Arrays.equals(map.get(key1), data1));
      assertTrue(Arrays.equals(map.get(key2), data2));
      // app2
      map = state.getAppContexts().get(attemptId2);
      assertNotEquals(map, null);
      assertThat(map).hasSize(2);
      assertTrue(Arrays.equals(map.get(key1), data1));
      assertTrue(Arrays.equals(map.get(key2), data2));

      // Generate next master key and remove one entry of app2
      secretManager.rollMasterKey();
      stateStore.removeAMRMProxyAppContextEntry(attemptId2, key1);

      // restart state store and verify recovered
      restartStateStore();
      secretManager.setNMStateStoreService(stateStore);
      state = stateStore.loadAMRMProxyState();
      assertEquals(state.getCurrentMasterKey(),
          secretManager.getCurrentMasterKeyData().getMasterKey());
      assertEquals(state.getNextMasterKey(),
          secretManager.getNextMasterKeyData().getMasterKey());
      assertEquals(state.getAppContexts().size(), 2);
      // app1
      map = state.getAppContexts().get(attemptId1);
      assertThat(map).isNotNull();
      assertThat(map).hasSize(2);
      assertTrue(Arrays.equals(map.get(key1), data1));
      assertTrue(Arrays.equals(map.get(key2), data2));
      // app2
      map = state.getAppContexts().get(attemptId2);
      assertThat(map).isNotNull();
      assertThat(map).hasSize(1);
      assertTrue(Arrays.equals(map.get(key2), data2));

      // Activate next master key and remove all entries of app1
      secretManager.activateNextMasterKey();
      stateStore.removeAMRMProxyAppContext(attemptId1);

      // restart state store and verify recovered
      restartStateStore();
      secretManager.setNMStateStoreService(stateStore);
      state = stateStore.loadAMRMProxyState();
      assertEquals(state.getCurrentMasterKey(),
          secretManager.getCurrentMasterKeyData().getMasterKey());
      assertThat(state.getNextMasterKey()).isNull();
      assertThat(state.getAppContexts()).hasSize(1);
      // app2 only
      map = state.getAppContexts().get(attemptId2);
      assertThat(map).isNotNull();
      assertThat(map).hasSize(1);
      assertTrue(Arrays.equals(map.get(key2), data2));
    } finally {
      secretManager.stop();
    }
  }

  @Test
  public void testStateStoreForResourceMapping() throws IOException {
    // test that stateStore is initially empty
    List<RecoveredContainerState> recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertTrue(recoveredContainers.isEmpty());

    ApplicationId appId = ApplicationId.newInstance(1234, 3);
    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
        4);
    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
    storeMockContainer(containerId);

    Container container = mock(Container.class);
    when(container.getContainerId()).thenReturn(containerId);
    ResourceMappings resourceMappings = new ResourceMappings();
    when(container.getResourceMappings()).thenReturn(resourceMappings);

    stateStore.storeAssignedResources(container, "gpu",
        Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2),
            new GpuDevice(3, 3)));

    // This will overwrite the above
    List<Serializable> gpuRes1 = Arrays.asList(
        new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4));
    stateStore.storeAssignedResources(container, "gpu", gpuRes1);

    List<Serializable> fpgaRes = Arrays.asList(
        new FpgaDevice("testType", 3, 3, "testIPID"),
        new FpgaDevice("testType", 4, 4, "testIPID"),
        new FpgaDevice("testType", 5, 5, "testIPID"),
        new FpgaDevice("testType", 6, 6, "testIPID"));
    stateStore.storeAssignedResources(container, "fpga", fpgaRes);

    List<Serializable> numaRes = Arrays.asList(
        new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10));
    stateStore.storeAssignedResources(container, "numa", numaRes);

    restartStateStore();
    recoveredContainers =
        loadContainersState(stateStore.getContainerStateIterator());
    assertEquals(1, recoveredContainers.size());
    RecoveredContainerState rcs = recoveredContainers.get(0);
    List<Serializable> resources = rcs.getResourceMappings()
        .getAssignedResources("gpu");
    Assert.assertEquals(gpuRes1, resources);
    Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu"));

    resources = rcs.getResourceMappings().getAssignedResources("fpga");
    Assert.assertEquals(fpgaRes, resources);
    Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga"));

    resources = rcs.getResourceMappings().getAssignedResources("numa");
    Assert.assertEquals(numaRes, resources);
    Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa"));
    // test removing numa resources from state store
    stateStore.releaseAssignedResources(containerId, "numa");
    recoveredContainers = loadContainersState(stateStore.getContainerStateIterator());
    resourceMappings = recoveredContainers.get(0).getResourceMappings();
    assertTrue(resourceMappings.getAssignedResources("numa").isEmpty());

    // testing calling deletion of non-existing key doesn't break anything
    try {
      stateStore.releaseAssignedResources(containerId, "numa");
    }catch (RuntimeException e){
      Assert.fail("Should not throw exception while deleting non existing key from statestore");
    }
  }

  @Test
  public void testStateStoreNodeHealth() throws IOException {
    // keep the working DB clean, break a temp DB
    DB keepDB = stateStore.getDB();
    DB myMocked = mock(DB.class);
    stateStore.setDB(myMocked);

    ApplicationId appId = ApplicationId.newInstance(1234, 1);
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(appId, 1);
    DBException toThrow = new DBException();
    Mockito.doThrow(toThrow).when(myMocked).
        put(any(byte[].class), any(byte[].class));
    // write some data
    try {
      // chosen a simple method could be any of the "void" methods
      ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
      stateStore.storeContainerKilled(containerId);
    } catch (IOException ioErr) {
      // Cause should be wrapped DBException
      assertTrue(ioErr.getCause() instanceof DBException);
      // check the store is marked unhealthy
      assertFalse("Statestore should have been unhealthy",
          stateStore.isHealthy());
      return;
    } finally {
      // restore the working DB
      stateStore.setDB(keepDB);
    }
    Assert.fail("Expected exception not thrown");
  }

  @Test
  public void testEmptyRestartTimes() throws IOException {
    List<Long> restartTimes = new ArrayList<>();
    ApplicationId appId = ApplicationId.newInstance(1234, 3);
    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
        4);
    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
    storeMockContainer(containerId);
    stateStore.storeContainerRestartTimes(containerId,
        restartTimes);
    restartStateStore();
    RecoveredContainerState rcs =
        loadContainersState(stateStore.getContainerStateIterator()).get(0);
    List<Long> recoveredRestartTimes = rcs.getRestartTimes();
    assertTrue(recoveredRestartTimes.isEmpty());
  }

  private StartContainerRequest storeMockContainer(ContainerId containerId)
      throws IOException {
    // create a container request
    LocalResource lrsrc = LocalResource.newInstance(
        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
        1234567890L);
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put("rsrc", lrsrc);
    Map<String, String> env = new HashMap<String, String>();
    env.put("somevar", "someval");
    List<String> containerCmds = new ArrayList<String>();
    containerCmds.add("somecmd");
    containerCmds.add("somearg");
    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
    serviceData.put("someservice",
        ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
    ByteBuffer containerTokens = ByteBuffer
        .wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
    Map<ApplicationAccessType, String> acls =
        new HashMap<ApplicationAccessType, String>();
    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
        localResources, env, containerCmds,
        serviceData, containerTokens, acls);
    Resource containerRsrc = Resource.newInstance(1357, 3);
    ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
        containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
        Priority.newInstance(7), 13579);
    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
        "tokenservice");
    StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
        containerToken);
    stateStore.storeContainer(containerId, 0, 0, containerReq);
    return containerReq;
  }

  private static class NMTokenSecretManagerForTest extends
      BaseNMTokenSecretManager {
    public MasterKey generateKey() {
      return createNewMasterKey().getMasterKey();
    }
  }

  private static class ContainerTokenKeyGeneratorForTest extends
      BaseContainerTokenSecretManager {
    public ContainerTokenKeyGeneratorForTest(Configuration conf) {
      super(conf);
    }

    public MasterKey generateKey() {
      return createNewMasterKey().getMasterKey();
    }
  }
}