TestSubClusterCleaner.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
 * Unit test for Sub-cluster Cleaner in GPG.
 */
public class TestSubClusterCleaner {

  private Configuration conf;
  private MemoryFederationStateStore stateStore;
  private FederationStateStoreFacade facade;
  private SubClusterCleaner cleaner;
  private GPGContext gpgContext;

  private static final long TWO_SECONDS = TimeUnit.SECONDS.toMillis(2);

  private ArrayList<SubClusterId> subClusterIds;

  @BeforeEach
  public void setup() throws YarnException {
    conf = new YarnConfiguration();

    // subcluster expires in one second
    conf.setLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, 1000);

    stateStore = new MemoryFederationStateStore();
    stateStore.init(conf);

    facade = FederationStateStoreFacade.getInstance(conf);
    facade.reinitialize(stateStore, conf);

    gpgContext = new GPGContextImpl();
    gpgContext.setStateStoreFacade(facade);

    cleaner = new SubClusterCleaner(conf, gpgContext);

    // Create and register six sub clusters
    subClusterIds = new ArrayList<SubClusterId>();
    for (int i = 0; i < 3; i++) {
      // Create sub cluster id and info
      SubClusterId subClusterId =
          SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));

      SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
          "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4",
          SubClusterState.SC_RUNNING, System.currentTimeMillis(), "");
      // Register the sub cluster
      stateStore.registerSubCluster(
          SubClusterRegisterRequest.newInstance(subClusterInfo));
      // Append the id to a local list
      subClusterIds.add(subClusterId);
    }
  }

  @AfterEach
  public void breakDown() throws Exception {
    stateStore.close();
  }

  @Test
  public void testSubClusterRegisterHeartBeatTime() throws YarnException {
    cleaner.run();
    Assertions.assertEquals(3, facade.getSubClusters(true, true).size());
  }

  /**
   * Test the base use case.
   */
  @Test
  public void testSubClusterHeartBeat() throws YarnException {
    // The first subcluster reports as Unhealthy
    SubClusterId subClusterId = subClusterIds.get(0);
    stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
        .newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity"));

    // The second subcluster didn't heartbeat for two seconds, should mark lost
    subClusterId = subClusterIds.get(1);
    stateStore.setSubClusterLastHeartbeat(subClusterId,
        System.currentTimeMillis() - TWO_SECONDS);

    cleaner.run();
    Assertions.assertEquals(1, facade.getSubClusters(true, true).size());
  }
}