TestFedAppReportFetcher.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.yarn.server.webproxy;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.fail;

public class TestFedAppReportFetcher {

  private Configuration conf;
  private static ApplicationHistoryProtocol history;

  private SubClusterId subClusterId1 = SubClusterId.newInstance("subCluster1");
  private SubClusterId subClusterId2 = SubClusterId.newInstance("subCluster2");
  private SubClusterInfo clusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1000",
      "10.0.0.1:1000", "10.0.0.1:1000", "10.0.0.1:1000", SubClusterState.SC_RUNNING, 0L, "");
  private SubClusterInfo clusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1000",
      "10.0.0.2:1000", "10.0.0.2:1000", "10.0.0.2:1000", SubClusterState.SC_RUNNING, 0L, "");
  private ApplicationClientProtocol appManager1;
  private ApplicationClientProtocol appManager2;
  private ApplicationId appId1 = ApplicationId.newInstance(0, 1);
  private ApplicationId appId2 = ApplicationId.newInstance(0, 2);

  private static FedAppReportFetcher fetcher;
  private final String appNotFoundExceptionMsg = "APP NOT FOUND";

  @After
  public void cleanUp() {
    history = null;
    fetcher = null;
  }

  private void testHelper(boolean isAHSEnabled)
      throws YarnException, IOException {
    conf = new YarnConfiguration();
    conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, isAHSEnabled);

    FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance(this.conf);
    FederationStateStore fss = new MemoryFederationStateStore();
    fss.init(conf);
    fedFacade.reinitialize(fss, conf);

    fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo1));
    fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo2));
    fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
        .newInstance(ApplicationHomeSubCluster.newInstance(appId1, subClusterId1)));
    fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
        .newInstance(ApplicationHomeSubCluster.newInstance(appId2, subClusterId2)));

    appManager1 = Mockito.mock(ApplicationClientProtocol.class);
    Mockito.when(appManager1.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
        .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));

    appManager2 = Mockito.mock(ApplicationClientProtocol.class);
    Mockito.when(appManager2.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
        .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));

    fetcher = new TestFedAppReportFetcher.FedAppReportFetcherForTest(conf);
    fetcher.registerSubCluster(clusterInfo1, appManager1);
    fetcher.registerSubCluster(clusterInfo2, appManager2);
  }

  @Test
  public void testFetchReportAHSEnabled() throws YarnException, IOException {
    testHelper(true);
    fetcher.getApplicationReport(appId1);
    fetcher.getApplicationReport(appId2);
    Mockito.verify(history, Mockito.times(2))
        .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
    Mockito.verify(appManager1, Mockito.times(1))
        .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
    Mockito.verify(appManager2, Mockito.times(1))
        .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
  }

  @Test
  public void testFetchReportAHSDisabled() throws Exception {
    testHelper(false);

    /* RM will not know of the app and Application History Service is disabled
     * So we will not try to get the report from AHS and RM will throw
     * ApplicationNotFoundException
     */
    LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg,
        () -> fetcher.getApplicationReport(appId1));
    LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg,
        () -> fetcher.getApplicationReport(appId2));

    Mockito.verify(appManager1, Mockito.times(1))
        .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
    Mockito.verify(appManager2, Mockito.times(1))
        .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
    Assert.assertNull("HistoryManager should be null as AHS is disabled", history);
  }

  @Test
  public void testGetRmAppPageUrlBase() throws IOException, YarnException {
    testHelper(true);
    String scheme = WebAppUtils.getHttpSchemePrefix(conf);
    Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId1),
        StringHelper.pjoin(scheme + clusterInfo1.getRMWebServiceAddress(), "cluster", "app"));
    Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId2),
        StringHelper.pjoin(scheme + clusterInfo2.getRMWebServiceAddress(), "cluster", "app"));
  }

  static class FedAppReportFetcherForTest extends FedAppReportFetcher {

    FedAppReportFetcherForTest(Configuration conf) {
      super(conf);
    }

    @Override
    protected ApplicationHistoryProtocol getAHSProxy(Configuration conf)
        throws IOException {
      GetApplicationReportResponse resp = Mockito.mock(GetApplicationReportResponse.class);
      history = Mockito.mock(ApplicationHistoryProtocol.class);
      try {
        Mockito.when(history.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
            .thenReturn(resp);
      } catch (YarnException e) {
        // This should never happen
        fail("Found exception when getApplicationReport!");
      }
      return history;
    }
  }
}