BaseFederationPoliciesTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.federation.policies;

import static org.mockito.Mockito.mock;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.Test;

/**
 * Base class for policies tests, tests for common reinitialization cases.
 */
public abstract class BaseFederationPoliciesTest {

  private ConfigurableFederationPolicy policy;
  private WeightedPolicyInfo policyInfo = mock(WeightedPolicyInfo.class);
  private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>();
  private FederationPolicyInitializationContext federationPolicyContext;
  private ApplicationSubmissionContext applicationSubmissionContext =
      mock(ApplicationSubmissionContext.class);
  private Random rand = new Random();
  private SubClusterId homeSubCluster;

  private ReservationSubmissionRequest reservationSubmissionRequest =
      mock(ReservationSubmissionRequest.class);

  @Test
  public void testReinitilialize() throws YarnException {
    FederationPolicyInitializationContext fpc =
        new FederationPolicyInitializationContext();
    ByteBuffer buf = getPolicyInfo().toByteBuffer();
    fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
        .newInstance("queue1", getPolicy().getClass().getCanonicalName(), buf));
    fpc.setFederationSubclusterResolver(
        FederationPoliciesTestUtil.initResolver());
    Configuration conf = new Configuration();
    fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade(conf));
    getPolicy().reinitialize(fpc);
  }

  @Test(expected = FederationPolicyInitializationException.class)
  public void testReinitilializeBad1() throws YarnException {
    getPolicy().reinitialize(null);
  }

  @Test(expected = FederationPolicyInitializationException.class)
  public void testReinitilializeBad2() throws YarnException {
    FederationPolicyInitializationContext fpc =
        new FederationPolicyInitializationContext();
    getPolicy().reinitialize(fpc);
  }

  @Test(expected = FederationPolicyInitializationException.class)
  public void testReinitilializeBad3() throws YarnException {
    FederationPolicyInitializationContext fpc =
        new FederationPolicyInitializationContext();
    ByteBuffer buf = mock(ByteBuffer.class);
    fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
        .newInstance("queue1", "WrongPolicyName", buf));
    fpc.setFederationSubclusterResolver(
        FederationPoliciesTestUtil.initResolver());
    Configuration conf = new Configuration();
    fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade(conf));
    getPolicy().reinitialize(fpc);
  }

  @Test(expected = FederationPolicyException.class)
  public void testNoSubclusters() throws YarnException {
    // empty the activeSubclusters map
    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
        getPolicyInfo(), new HashMap<>());

    ConfigurableFederationPolicy localPolicy = getPolicy();
    if (localPolicy instanceof FederationRouterPolicy) {
      ((FederationRouterPolicy) localPolicy)
          .getHomeSubcluster(getApplicationSubmissionContext(), null);
    } else {
      String[] hosts = new String[] {"host1", "host2"};
      List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
          .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
      ((FederationAMRMProxyPolicy) localPolicy).splitResourceRequests(
          resourceRequests, new HashSet<SubClusterId>());
    }
  }

  public ConfigurableFederationPolicy getPolicy() {
    return policy;
  }

  public void setPolicy(ConfigurableFederationPolicy policy) {
    this.policy = policy;
  }

  public WeightedPolicyInfo getPolicyInfo() {
    return policyInfo;
  }

  public void setPolicyInfo(WeightedPolicyInfo policyInfo) {
    this.policyInfo = policyInfo;
  }

  public Map<SubClusterId, SubClusterInfo> getActiveSubclusters() {
    return activeSubclusters;
  }

  public void setActiveSubclusters(
      Map<SubClusterId, SubClusterInfo> activeSubclusters) {
    this.activeSubclusters = activeSubclusters;
  }

  public FederationPolicyInitializationContext getFederationPolicyContext() {
    return federationPolicyContext;
  }

  public void setFederationPolicyContext(
      FederationPolicyInitializationContext federationPolicyContext) {
    this.federationPolicyContext = federationPolicyContext;
  }

  public ApplicationSubmissionContext getApplicationSubmissionContext() {
    return applicationSubmissionContext;
  }

  public void setApplicationSubmissionContext(
      ApplicationSubmissionContext applicationSubmissionContext) {
    this.applicationSubmissionContext = applicationSubmissionContext;
  }

  public Random getRand() {
    return rand;
  }

  public void setRand(long seed) {
    this.rand.setSeed(seed);
  }

  public SubClusterId getHomeSubCluster() {
    return homeSubCluster;
  }

  public void setHomeSubCluster(SubClusterId homeSubCluster) {
    this.homeSubCluster = homeSubCluster;
  }

  public void setMockActiveSubclusters(int numSubclusters) {
    for (int i = 1; i <= numSubclusters; i++) {
      SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
      SubClusterInfo sci = SubClusterInfo.newInstance(
          sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", SubClusterState.SC_RUNNING,
          System.currentTimeMillis(), "something");
      getActiveSubclusters().put(sc.toId(), sci);
    }
  }

  public String generateClusterMetricsInfo(int id) {
    long mem = 1024 * getRand().nextInt(277 * 100 - 1);
    // plant a best cluster
    if (id == 5) {
      mem = 1024 * 277 * 100;
    }
    String clusterMetrics =
        "{\"clusterMetrics\":{\"appsSubmitted\":65, \"appsCompleted\":64,\"appsPending\":0,"
        + "\"appsRunning\":0, \"appsFailed\":0, \"appsKilled\":1,\"reservedMB\":0,\"availableMB\":"
        + mem + ", \"allocatedMB\":0,\"reservedVirtualCores\":0, \"availableVirtualCores\":2216,"
        + "\"allocatedVirtualCores\":0, \"containersAllocated\":0,\"containersReserved\":0,"
        + "\"containersPending\":0,\"totalMB\":28364800, \"totalVirtualCores\":2216,"
        + "\"totalNodes\":278, \"lostNodes\":1,\"unhealthyNodes\":0,\"decommissionedNodes\":0, "
        + "\"rebootedNodes\":0, \"activeNodes\":277}}";
    return clusterMetrics;
  }

  public FederationStateStoreFacade getMemoryFacade() throws YarnException {

    // setting up a store and its facade (with caching off)
    YarnConfiguration conf = new YarnConfiguration();
    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
    FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance(conf);
    FederationStateStore store = new MemoryFederationStateStore();
    store.init(conf);
    fedFacade.reinitialize(store, conf);

    for (SubClusterInfo sinfo : getActiveSubclusters().values()) {
      store.registerSubCluster(SubClusterRegisterRequest.newInstance(sinfo));
    }

    return fedFacade;
  }

  public ReservationSubmissionRequest getReservationSubmissionRequest() {
    return reservationSubmissionRequest;
  }

  public void setReservationSubmissionRequest(
      ReservationSubmissionRequest reservationSubmissionRequest) {
    this.reservationSubmissionRequest = reservationSubmissionRequest;
  }

  public void setupContext() throws YarnException {
    FederationPolicyInitializationContext context =
        FederationPoliciesTestUtil.initializePolicyContext2(getPolicy(),
        getPolicyInfo(), getActiveSubclusters(), getMemoryFacade());
    this.setFederationPolicyContext(context);
  }
}