GPGPolicyFacade.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.yarn.server.globalpolicygenerator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

/**
 * A utility class for the GPG Policy Generator to read and write policies
 * into the FederationStateStore. Policy specific logic is abstracted away in
 * this class, so the PolicyGenerator can avoid dealing with policy
 * construction, reinitialization, and serialization.
 *
 * There are only two exposed methods:
 *
 * {@link #getPolicyManager(String)}
 * Gets the PolicyManager via queue name. Null if there is no policy
 * configured for the specified queue. The PolicyManager can be used to
 * extract the {@link FederationRouterPolicy} and
 * {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
 *
 * {@link #setPolicyManager(FederationPolicyManager)}
 * Sets the PolicyManager. If the policy configuration is the same, no change
 * occurs. Otherwise, the internal cache is updated and the new configuration
 * is written into the FederationStateStore
 *
 * This class assumes that the GPG is the only service
 * writing policies. Thus, the only FederationStateStore reads occur the first
 * time a queue policy is retrieved - after that, the GPG only writes to the
 * FederationStateStore.
 *
 * The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
 * cache. The primary use for these caches are to serve reads, and to
 * identify when the PolicyGenerator has actually changed the policy
 * so unnecessary FederationStateStore policy writes can be avoided.
 */

public class GPGPolicyFacade {

  private static final Logger LOG =
      LoggerFactory.getLogger(GPGPolicyFacade.class);

  private FederationStateStoreFacade stateStore;

  private Map<String, FederationPolicyManager> policyManagerMap;
  private Map<String, SubClusterPolicyConfiguration> policyConfMap;

  private boolean readOnly;

  public GPGPolicyFacade(FederationStateStoreFacade stateStore,
      Configuration conf) {
    this.stateStore = stateStore;
    this.policyManagerMap = new HashMap<>();
    this.policyConfMap = new HashMap<>();
    this.readOnly =
        conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
            YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
  }

  /**
   * Provides a utility for the policy generator to read the policy manager
   * from the FederationStateStore. Because the policy generator should be the
   * only component updating the policy, this implementation does not use the
   * reinitialization feature.
   *
   * @param queueName the name of the queue we want the policy manager for.
   * @return the policy manager responsible for the queue policy.
   * @throws YarnException exceptions from yarn servers.
   */
  public FederationPolicyManager getPolicyManager(String queueName)
      throws YarnException {
    FederationPolicyManager policyManager = policyManagerMap.get(queueName);

    // If we don't have the policy manager cached, pull configuration
    // from the FederationStateStore to create and cache it
    if (policyManager == null) {
      try {

        // If we don't have the configuration cached, pull it
        // from the stateStore
        SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);

        if (conf == null) {
          conf = stateStore.getPolicyConfiguration(queueName);
        }

        // If configuration is still null, it does not exist in the
        // FederationStateStore
        if (conf == null) {
          LOG.info("Read null policy for queue {}.", queueName);
          return null;
        }

        // Generate PolicyManager based on PolicyManagerType.
        String policyManagerType = conf.getType();
        policyManager = FederationPolicyUtils.instantiatePolicyManager(policyManagerType);
        policyManager.setQueue(queueName);

        // If PolicyManager supports Weighted PolicyInfo, it means that
        // we need to use this parameter to determine which sub-cluster the router goes to
        // or which sub-cluster the container goes to.
        if (policyManager.isSupportWeightedPolicyInfo()) {
          ByteBuffer weightedPolicyInfoParams = conf.getParams();
          if (weightedPolicyInfoParams == null) {
            LOG.warn("Warning: Queue = {}, FederationPolicyManager {} WeightedPolicyInfo is empty.",
                queueName, policyManagerType);
            return null;
          }
          WeightedPolicyInfo weightedPolicyInfo =
              WeightedPolicyInfo.fromByteBuffer(conf.getParams());
          policyManager.setWeightedPolicyInfo(weightedPolicyInfo);
        } else {
          LOG.warn("Warning: FederationPolicyManager of unsupported WeightedPolicyInfo type {}, " +
              "initialization may be incomplete.", policyManager.getClass());
        }

        policyManagerMap.put(queueName, policyManager);
        policyConfMap.put(queueName, conf);
      } catch (YarnException e) {
        LOG.error("Error reading SubClusterPolicyConfiguration from state "
            + "store for queue: {}", queueName);
        throw e;
      }
    }
    return policyManager;
  }

  /**
   * Provides a utility for the policy generator to write a policy manager
   * into the FederationStateStore. The facade keeps a cache and will only write
   * into the FederationStateStore if the policy configuration has changed.
   *
   * @param policyManager The policy manager we want to update into the state
   *                      store. It contains policy information as well as
   *                      the queue name we will update for.
   * @throws YarnException  exceptions from yarn servers.
   */
  public void setPolicyManager(FederationPolicyManager policyManager)
      throws YarnException {
    if (policyManager == null) {
      LOG.warn("Attempting to set null policy manager");
      return;
    }
    // Extract the configuration from the policy manager
    String queue = policyManager.getQueue();
    SubClusterPolicyConfiguration conf;
    try {
      conf = policyManager.serializeConf();
    } catch (FederationPolicyInitializationException e) {
      LOG.warn("Error serializing policy for queue {}", queue);
      throw e;
    }
    if (conf == null) {
      // State store does not currently support setting a policy back to null
      // because it reads the queue name to set from the policy!
      LOG.warn("Skip setting policy to null for queue {} into state store",
          queue);
      return;
    }
    // Compare with configuration cache, if different, write the conf into
    // store and update our conf and manager cache
    if (!confCacheEqual(queue, conf)) {
      try {
        if (readOnly) {
          LOG.info("[read-only] Skipping policy update for queue {}", queue);
          return;
        }
        LOG.info("Updating policy for queue {} into state store", queue);
        stateStore.setPolicyConfiguration(conf);
        policyConfMap.put(queue, conf);
        policyManagerMap.put(queue, policyManager);
      } catch (YarnException e) {
        LOG.warn("Error writing SubClusterPolicyConfiguration to state "
            + "store for queue: {}", queue);
        throw e;
      }
    } else {
      LOG.info("Setting unchanged policy - state store write skipped");
    }
  }

  /**
   * @param queue the queue to check the cached policy configuration for
   * @param conf the new policy configuration
   * @return whether or not the conf is equal to the cached conf
   */
  private boolean confCacheEqual(String queue,
      SubClusterPolicyConfiguration conf) {
    SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
    if (conf == null && cachedConf == null) {
      return true;
    } else if (conf != null && cachedConf != null) {
      if (conf.equals(cachedConf)) {
        return true;
      }
    }
    return false;
  }
}