GlobalPolicyGenerator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.globalpolicygenerator;

import java.io.IOException;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.WebServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Global Policy Generator (GPG) is a Yarn Federation component. By tuning the
 * Federation policies in Federation State Store, GPG overlooks the entire
 * federated cluster and ensures that the system is tuned and balanced all the
 * time.
 *
 * The GPG operates continuously but out-of-band from all cluster operations,
 * that allows to enforce global invariants, affect load balancing, trigger
 * draining of sub-clusters that will undergo maintenance, etc.
 */
public class GlobalPolicyGenerator extends CompositeService {

  public static final Logger LOG =
      LoggerFactory.getLogger(GlobalPolicyGenerator.class);

  // YARN Variables
  private static CompositeServiceShutdownHook gpgShutdownHook;
  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
  private AtomicBoolean isStopping = new AtomicBoolean(false);
  private static final String METRICS_NAME = "Global Policy Generator";
  private static long gpgStartupTime = System.currentTimeMillis();

  // Federation Variables
  private GPGContext gpgContext;
  private RegistryOperations registry;

  // Scheduler service that runs tasks periodically
  private ScheduledThreadPoolExecutor scheduledExecutorService;
  private SubClusterCleaner subClusterCleaner;
  private ApplicationCleaner applicationCleaner;
  private PolicyGenerator policyGenerator;
  private String webAppAddress;
  private JvmPauseMonitor pauseMonitor;
  private WebApp webApp;

  public GlobalPolicyGenerator() {
    super(GlobalPolicyGenerator.class.getName());
    this.gpgContext = new GPGContextImpl();
  }

  protected void doSecureLogin() throws IOException {
    Configuration config = getConfig();
    SecurityUtil.login(config, YarnConfiguration.GPG_KEYTAB,
        YarnConfiguration.GPG_PRINCIPAL, getHostName(config));
  }

  protected void initAndStart(Configuration conf, boolean hasToReboot) {
    // Remove the old hook if we are rebooting.
    if (hasToReboot && null != gpgShutdownHook) {
      ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
    }
    gpgShutdownHook = new CompositeServiceShutdownHook(this);
    ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
        SHUTDOWN_HOOK_PRIORITY);
    this.init(conf);
    this.start();
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    UserGroupInformation.setConfiguration(conf);
    // Set up the context
    this.gpgContext.setStateStoreFacade(FederationStateStoreFacade.getInstance(conf));
    GPGPolicyFacade gpgPolicyFacade =
        new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf);
    this.gpgContext.setPolicyFacade(gpgPolicyFacade);

    this.registry = FederationStateStoreFacade.createInstance(conf,
        YarnConfiguration.YARN_REGISTRY_CLASS,
        YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
        RegistryOperations.class);
    this.registry.init(conf);

    UserGroupInformation user = UserGroupInformation.getCurrentUser();
    FederationRegistryClient registryClient =
        new FederationRegistryClient(conf, this.registry, user);
    this.gpgContext.setRegistryClient(registryClient);

    this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
        conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
            YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
    this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);

    this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
        YarnConfiguration.GPG_APPCLEANER_CLASS,
        YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, ApplicationCleaner.class);
    this.applicationCleaner.init(conf, this.gpgContext);

    this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);

    this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
    DefaultMetricsSystem.initialize(METRICS_NAME);
    JvmMetrics jm = JvmMetrics.initSingleton("GPG", null);
    pauseMonitor = new JvmPauseMonitor();
    addService(pauseMonitor);
    jm.setPauseMonitor(pauseMonitor);

    // super.serviceInit after all services are added
    super.serviceInit(conf);
    WebServiceClient.initialize(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    try {
      doSecureLogin();
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed GPG login", e);
    }

    super.serviceStart();

    this.registry.start();

    // Schedule SubClusterCleaner service
    Configuration config = getConfig();
    long scCleanerIntervalMs = config.getTimeDuration(
        YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
        YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS);
    if (scCleanerIntervalMs > 0) {
      this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
          0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
      LOG.info("Scheduled sub-cluster cleaner with interval: {}",
          DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
    }

    // Schedule ApplicationCleaner service
    long appCleanerIntervalMs = config.getTimeDuration(
        YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
        YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS);

    if (appCleanerIntervalMs > 0) {
      this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
          0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
      LOG.info("Scheduled application cleaner with interval: {}",
          DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
    }

    // Schedule PolicyGenerator
    // We recommend using yarn.federation.gpg.policy.generator.interval
    // instead of yarn.federation.gpg.policy.generator.interval-ms

    // To ensure compatibility,
    // let's first obtain the value of "yarn.federation.gpg.policy.generator.interval-ms."
    long policyGeneratorIntervalMillis = 0L;
    String generatorIntervalMS = config.get(YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS);
    if (generatorIntervalMS != null) {
      LOG.warn("yarn.federation.gpg.policy.generator.interval-ms is deprecated property, " +
          " we better set it yarn.federation.gpg.policy.generator.interval.");
      policyGeneratorIntervalMillis = Long.parseLong(generatorIntervalMS);
    }

    // If it is not available, let's retrieve
    // the value of "yarn.federation.gpg.policy.generator.interval" instead.
    if (policyGeneratorIntervalMillis == 0) {
      policyGeneratorIntervalMillis = config.getTimeDuration(
          YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL,
          YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL, TimeUnit.MILLISECONDS);
    }

    if(policyGeneratorIntervalMillis > 0){
      this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator,
          0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS);
      LOG.info("Scheduled policy-generator with interval: {}",
          DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
    }
    startWepApp();
  }

  @Override
  protected void serviceStop() throws Exception {
    if (this.registry != null) {
      this.registry.stop();
      this.registry = null;
    }

    try {
      if (this.scheduledExecutorService != null
          && !this.scheduledExecutorService.isShutdown()) {
        this.scheduledExecutorService.shutdown();
        LOG.info("Stopped ScheduledExecutorService");
      }
    } catch (Exception e) {
      LOG.error("Failed to shutdown ScheduledExecutorService", e);
      throw e;
    }

    if (this.isStopping.getAndSet(true)) {
      return;
    }
    if (webApp != null) {
      webApp.stop();
    }
    DefaultMetricsSystem.shutdown();
    super.serviceStop();
    WebServiceClient.destroy();
  }

  public String getName() {
    return "FederationGlobalPolicyGenerator";
  }

  public GPGContext getGPGContext() {
    return this.gpgContext;
  }

  @VisibleForTesting
  public void startWepApp() {
    Configuration configuration = getConfig();

    boolean enableCors = configuration.getBoolean(YarnConfiguration.GPG_WEBAPP_ENABLE_CORS_FILTER,
        YarnConfiguration.DEFAULT_GPG_WEBAPP_ENABLE_CORS_FILTER);

    if (enableCors) {
      configuration.setBoolean(HttpCrossOriginFilterInitializer.PREFIX
          + HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true);
    }

    // Always load pseudo authentication filter to parse "user.name" in an URL
    // to identify a HTTP request's user.
    boolean hasHadoopAuthFilterInitializer = false;
    String filterInitializerConfKey = "hadoop.http.filter.initializers";
    Class<?>[] initializersClasses = configuration.getClasses(filterInitializerConfKey);

    List<String> targets = new ArrayList<>();
    if (initializersClasses != null) {
      for (Class<?> initializer : initializersClasses) {
        if (initializer.getName().equals(AuthenticationFilterInitializer.class.getName())) {
          hasHadoopAuthFilterInitializer = true;
          break;
        }
        targets.add(initializer.getName());
      }
    }
    if (!hasHadoopAuthFilterInitializer) {
      targets.add(AuthenticationFilterInitializer.class.getName());
      configuration.set(filterInitializerConfKey, StringUtils.join(",", targets));
    }
    LOG.info("Instantiating GPGWebApp at {}.", webAppAddress);
    GPGWebApp gpgWebApp = new GPGWebApp(this);
    webApp = WebApps.$for("gpg", GPGContext.class, this.gpgContext,
        "gpg-ws").at(webAppAddress).
         withResourceConfig(gpgWebApp.resourceConfig()).start(gpgWebApp);
  }

  @SuppressWarnings("resource")
  public static void startGPG(String[] argv, Configuration conf) {
    boolean federationEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
        YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
    if (federationEnabled) {
      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
      StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv, LOG);
      GlobalPolicyGenerator globalPolicyGenerator = new GlobalPolicyGenerator();
      globalPolicyGenerator.initAndStart(conf, false);
    } else {
      LOG.warn("Federation is not enabled. The gpg cannot start.");
    }
  }

  /**
   * Returns the hostname for this Router. If the hostname is not
   * explicitly configured in the given config, then it is determined.
   *
   * @param config configuration
   * @return the hostname (NB: may not be a FQDN)
   * @throws UnknownHostException if the hostname cannot be determined
   */
  private String getHostName(Configuration config)
      throws UnknownHostException {
    String name = config.get(YarnConfiguration.GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY);
    if (name == null) {
      name = InetAddress.getLocalHost().getHostName();
    }
    return name;
  }

  public static void main(String[] argv) {
    try {
      YarnConfiguration conf = new YarnConfiguration();
      GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
      argv = hParser.getRemainingArgs();
      if (argv.length > 1) {
        if (argv[0].equals("-format-policy-store")) {
          handFormatPolicyStateStore(conf);
        } else {
          printUsage(System.err);
        }
      } else {
        startGPG(argv, conf);
      }
    } catch (Throwable t) {
      LOG.error("Error starting global policy generator", t);
      System.exit(-1);
    }
  }

  public static long getGPGStartupTime() {
    return gpgStartupTime;
  }

  @VisibleForTesting
  public WebApp getWebApp() {
    return webApp;
  }

  private static void printUsage(PrintStream out) {
    out.println("Usage: yarn gpg [-format-policy-store]");
  }

  private static void handFormatPolicyStateStore(Configuration conf) {
    try {
      System.out.println("Deleting Federation policy state store.");
      FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf);
      System.out.println("Federation policy state store has been cleaned.");
      facade.deleteAllPoliciesConfigurations();
    } catch (Exception e) {
      LOG.error("Delete Federation policy state store error.", e);
      System.err.println("Delete Federation policy state store error, exception = " + e);
    }
  }

  @Override
  public void setConfig(Configuration conf) {
    super.setConfig(conf);
  }
}