RouterBlock.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.router.webapp;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.glassfish.jersey.jettison.JettisonJaxbContext;
import org.glassfish.jersey.jettison.JettisonMarshaller;

import javax.ws.rs.client.Client;
import java.io.StringWriter;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;

public abstract class RouterBlock extends HtmlBlock {

  private final Router router;
  private final ViewContext ctx;
  private final FederationStateStoreFacade facade;
  private final Configuration conf;

  public static final String ROUTER = "router";

  public RouterBlock(Router router, ViewContext ctx) {
    super(ctx);
    this.ctx = ctx;
    this.router = router;
    this.facade = FederationStateStoreFacade.getInstance(router.getConfig());
    this.conf = this.router.getConfig();
  }

  /**
   * Get RouterClusterMetrics Info.
   *
   * @return Router ClusterMetricsInfo.
   */
  protected ClusterMetricsInfo getRouterClusterMetricsInfo() {
    boolean isEnabled = isYarnFederationEnabled();
    String webAppAddress;
    if(isEnabled) {
      webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
    } else {
      webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
    }
    return getClusterMetricsInfo(webAppAddress);
  }

  /**
   * Get RouterClusterMetrics Info.
   *
   * @param webAppAddress webAppAddress.
   * @return ClusterMetricsInfo.
   */
  protected ClusterMetricsInfo getClusterMetricsInfo(String webAppAddress) {
    // If webAppAddress is empty, we will return NULL.
    if (StringUtils.isBlank(webAppAddress)) {
      return null;
    }

    // We will get ClusterMetricsInfo By webAppAddress.
    Client client = RouterWebServiceUtil.createJerseyClient(conf);
    ClusterMetricsInfo metrics = RouterWebServiceUtil
        .genericForward(webAppAddress, null, ClusterMetricsInfo.class, HTTPMethods.GET,
        RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.METRICS, null, null,
        conf, client);
    client.close();
    return metrics;
  }

  /**
   * Get a list of subclusters.
   *
   * @return subcluster List.
   */
  protected List<SubClusterInfo> getSubClusterInfoList() {
    List<SubClusterInfo> subClusters = new ArrayList<>();
    try {
      Map<SubClusterId, SubClusterInfo> subClustersInfo = facade.getSubClusters(true);
      // Sort the SubClusters.
      subClusters.addAll(subClustersInfo.values());
      Comparator<? super SubClusterInfo> cmp = Comparator.comparing(o -> o.getSubClusterId());
      Collections.sort(subClusters, cmp);

      // Return results
      return subClusters;
    } catch (YarnException e) {
      LOG.error("getSubClusterInfoList error.", e);
      return subClusters;
    }
  }

  /**
   * Whether Yarn Federation is enabled.
   *
   * @return true, enable yarn federation; false, not enable yarn federation;
   */
  protected boolean isYarnFederationEnabled() {
    boolean isEnabled = conf.getBoolean(
        YarnConfiguration.FEDERATION_ENABLED,
        YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
    return isEnabled;
  }

  /**
   * Get a list of SubClusterIds for ActiveSubClusters.
   *
   * @return list of SubClusterIds.
   */
  protected List<String> getActiveSubClusterIds() {
    List<String> result = new ArrayList<>();
    try {
      Map<SubClusterId, SubClusterInfo> subClustersInfo = facade.getSubClusters(true);
      subClustersInfo.values().stream().forEach(subClusterInfo -> {
        result.add(subClusterInfo.getSubClusterId().getId());
      });
    } catch (Exception e) {
      LOG.error("getActiveSubClusters error.", e);
    }
    return result;
  }

  /**
   * init SubCluster MetricsOverviewTable.
   *
   * @param html HTML Object.
   * @param subclusterId subClusterId
   */
  protected void initSubClusterMetricsOverviewTable(Block html, String subclusterId) {
    MetricsOverviewTable metricsOverviewTable = new MetricsOverviewTable(this.router, this.ctx);
    metricsOverviewTable.render(html, subclusterId);
  }

  /**
   * Get ClusterMetricsInfo By SubClusterId.
   *
   * @param subclusterId subClusterId
   * @return SubCluster RM ClusterMetricsInfo
   */
  protected ClusterMetricsInfo getClusterMetricsInfoBySubClusterId(String subclusterId) {
    try {
      SubClusterId subClusterId = SubClusterId.newInstance(subclusterId);
      SubClusterInfo subClusterInfo = facade.getSubCluster(subClusterId);
      if (subClusterInfo != null) {
        Client client = RouterWebServiceUtil.createJerseyClient(this.conf);
        // Call the RM interface to obtain schedule information
        String webAppAddress =  WebAppUtils.getHttpSchemePrefix(this.conf) +
            subClusterInfo.getRMWebServiceAddress();
        ClusterMetricsInfo metrics = RouterWebServiceUtil
            .genericForward(webAppAddress, null, ClusterMetricsInfo.class, HTTPMethods.GET,
            RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.METRICS, null, null,
            conf, client);
        client.close();
        return metrics;
      }
    } catch (Exception e) {
      LOG.error("getClusterMetricsInfoBySubClusterId subClusterId = {} error.", subclusterId, e);
    }
    return null;
  }

  /**
   * Get SubClusterInfo based on subclusterId.
   *
   * @param subclusterId subCluster Id
   * @return SubClusterInfo Collection
   */
  protected Collection<SubClusterInfo> getSubClusterInfoList(String subclusterId) {
    try {
      SubClusterId subClusterId = SubClusterId.newInstance(subclusterId);
      SubClusterInfo subClusterInfo = facade.getSubCluster(subClusterId);
      return Collections.singletonList(subClusterInfo);
    } catch (Exception e) {
      LOG.error("getSubClusterInfoList subClusterId = {} error.", subclusterId, e);
    }
    return null;
  }

  public FederationStateStoreFacade getFacade() {
    return facade;
  }

  /**
   * Initialize the Nodes menu.
   *
   * @param mainList HTML Object.
   * @param subClusterIds subCluster List.
   */
  protected void initNodesMenu(Hamlet.UL<Hamlet.DIV<Hamlet>> mainList,
      List<String> subClusterIds) {
    if (CollectionUtils.isNotEmpty(subClusterIds)) {
      Hamlet.UL<Hamlet.LI<Hamlet.UL<Hamlet.DIV<Hamlet>>>> nodesList =
          mainList.li().a(url("nodes"), "Nodes").ul().
          $style("padding:0.3em 1em 0.1em 2em");

      // ### nodes info
      nodesList.li().__();
      for (String subClusterId : subClusterIds) {
        nodesList.li().a(url("nodes", subClusterId), subClusterId).__();
      }
      nodesList.__().__();
    } else {
      mainList.li().a(url("nodes"), "Nodes").__();
    }
  }

  /**
   * Initialize the Applications menu.
   *
   * @param mainList HTML Object.
   * @param subClusterIds subCluster List.
   */
  protected void initApplicationsMenu(Hamlet.UL<Hamlet.DIV<Hamlet>> mainList,
      List<String> subClusterIds) {
    if (CollectionUtils.isNotEmpty(subClusterIds)) {
      Hamlet.UL<Hamlet.LI<Hamlet.UL<Hamlet.DIV<Hamlet>>>> apps =
          mainList.li().a(url("apps"), "Applications").ul();
      apps.li().__();
      for (String subClusterId : subClusterIds) {
        Hamlet.LI<Hamlet.UL<Hamlet.LI<Hamlet.UL<Hamlet.DIV<Hamlet>>>>> subClusterList = apps.
            li().a(url("apps", subClusterId), subClusterId);
        Hamlet.UL<Hamlet.LI<Hamlet.UL<Hamlet.LI<Hamlet.UL<Hamlet.DIV<Hamlet>>>>>> subAppStates =
            subClusterList.ul().$style("padding:0.3em 1em 0.1em 2em");
        subAppStates.li().__();
        for (YarnApplicationState state : YarnApplicationState.values()) {
          subAppStates.
              li().a(url("apps", subClusterId, state.toString()), state.toString()).__();
        }
        subAppStates.li().__().__();
        subClusterList.__();
      }
      apps.__().__();
    } else {
      mainList.li().a(url("apps"), "Applications").__();
    }
  }

  /**
   * Initialize the NodeLabels menu.
   *
   * @param mainList HTML Object.
   * @param subClusterIds subCluster List.
   */
  protected void initNodeLabelsMenu(Hamlet.UL<Hamlet.DIV<Hamlet>> mainList,
      List<String> subClusterIds) {

    if (CollectionUtils.isNotEmpty(subClusterIds)) {
      Hamlet.UL<Hamlet.LI<Hamlet.UL<Hamlet.DIV<Hamlet>>>> nodesList =
          mainList.li().a(url("nodelabels"), "Node Labels").ul().
          $style("padding:0.3em 1em 0.1em 2em");

      // ### nodelabels info
      nodesList.li().__();
      for (String subClusterId : subClusterIds) {
        nodesList.li().a(url("nodelabels", subClusterId), subClusterId).__();
      }
      nodesList.__().__();
    } else {
      mainList.li().a(url("nodelabels"), "Node Labels").__();
    }
  }

  /**
   * Generate SubClusterInfo based on local cluster information.
   *
   * @param config Configuration.
   * @return SubClusterInfo.
   */
  protected SubClusterInfo getSubClusterInfoByLocalCluster(Configuration config) {

    Client client = null;
    try {

      // Step1. Retrieve the name of the local cluster and ClusterMetricsInfo.
      String localClusterName = config.get(YarnConfiguration.RM_CLUSTER_ID, UNAVAILABLE);
      String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(config);
      String rmWebAppURLWithoutScheme = WebAppUtils.getRMWebAppURLWithoutScheme(config);
      client = RouterWebServiceUtil.createJerseyClient(config);
      ClusterMetricsInfo clusterMetricsInfos = RouterWebServiceUtil
          .genericForward(webAppAddress, null, ClusterMetricsInfo.class, HTTPMethods.GET,
          RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.METRICS, null, null,
           config, client);

      if (clusterMetricsInfos == null) {
        return null;
      }

      // Step2. Retrieve cluster information for the local cluster to obtain its startup time.
      ClusterInfo clusterInfo = RouterWebServiceUtil.genericForward(webAppAddress, null,
          ClusterInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.INFO,
          null, null, config, client);

      if (clusterInfo == null) {
        return null;
      }

      // Step3. Get Local-Cluster Capability
      JettisonJaxbContext jc = new JettisonJaxbContext(ClusterMetricsInfo.class);
      JettisonMarshaller marshaller = jc.createJsonMarshaller();
      StringWriter writer = new StringWriter();
      marshaller.marshallToJSON(clusterMetricsInfos, writer);
      String capability = writer.toString();

      // Step4. Generate SubClusterInfo.
      SubClusterId subClusterId = SubClusterId.newInstance(localClusterName);
      SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
          rmWebAppURLWithoutScheme, SubClusterState.SC_RUNNING, clusterInfo.getStartedOn(),
          Time.now(), capability);

      return subClusterInfo;
    } catch (Exception e) {
      LOG.error("An error occurred while parsing the local YARN cluster.", e);
    } finally {
      if (client != null) {
        client.close();
      }
    }
    return null;
  }
}