ResourcePluginManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DeviceMappingManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuNodeResourceUpdateHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;

/**
 * Manages {@link ResourcePlugin} configured on this NodeManager.
 */
public class ResourcePluginManager {
  private static final Logger LOG =
      LoggerFactory.getLogger(ResourcePluginManager.class);
  private static final Set<String> SUPPORTED_RESOURCE_PLUGINS =
      ImmutableSet.of(GPU_URI, FPGA_URI);

  private Map<String, ResourcePlugin> configuredPlugins =
          Collections.emptyMap();

  private DeviceMappingManager deviceMappingManager = null;

  public void initialize(Context context)
      throws YarnException, ClassNotFoundException {
    Configuration conf = context.getConf();
    String[] plugins = getPluginsFromConfig(conf);

    Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();
    if (plugins != null) {
      pluginMap = initializePlugins(conf, context, plugins);
    }

    // Try to load pluggable device plugins
    boolean pluggableDeviceFrameworkEnabled = conf.getBoolean(
        YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
        YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);

    if (pluggableDeviceFrameworkEnabled) {
      initializePluggableDevicePlugins(context, conf, pluginMap);
    } else {
      LOG.info("The pluggable device framework is not enabled."
              + " If you want, please set true to {}",
          YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
    }
    configuredPlugins = Collections.unmodifiableMap(pluginMap);
  }

  private String[] getPluginsFromConfig(Configuration conf) {
    String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
    if (plugins == null || plugins.length == 0) {
      LOG.info("No Resource plugins found from configuration!");
    }
    LOG.info("Found Resource plugins from configuration: "
        + Arrays.toString(plugins));

    return plugins;
  }

  private Map<String, ResourcePlugin> initializePlugins(Configuration conf,
      Context context, String[] plugins) throws YarnException {
    Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();

    for (String resourceName : plugins) {
      resourceName = resourceName.trim();
      ensurePluginIsSupported(resourceName);

      if (!isPluginDuplicate(pluginMap, resourceName)) {
        ResourcePlugin plugin = null;
        if (resourceName.equals(GPU_URI)) {
          final GpuDiscoverer gpuDiscoverer = new GpuDiscoverer();
          final GpuNodeResourceUpdateHandler updateHandler =
              new GpuNodeResourceUpdateHandler(gpuDiscoverer, conf);
          plugin = new GpuResourcePlugin(updateHandler, gpuDiscoverer);
        } else if (resourceName.equals(FPGA_URI)) {
          plugin = new FpgaResourcePlugin();
        }

        if (plugin == null) {
          throw new YarnException(
              "This shouldn't happen, plugin=" + resourceName
                  + " should be loaded and initialized");
        }
        plugin.initialize(context);
        LOG.info("Initialized plugin {}", plugin);
        pluginMap.put(resourceName, plugin);
      }
    }
    return pluginMap;
  }

  private void ensurePluginIsSupported(String resourceName)
      throws YarnException {
    if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) {
      String msg =
          "Trying to initialize resource plugin with name=" + resourceName
              + ", it is not supported, list of supported plugins:"
              + StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS);
      LOG.error(msg);
      throw new YarnException(msg);
    }
  }

  private boolean isPluginDuplicate(Map<String, ResourcePlugin> pluginMap,
      String resourceName) {
    if (pluginMap.containsKey(resourceName)) {
      LOG.warn("Ignoring duplicate Resource plugin definition: " +
          resourceName);
      return true;
    }
    return false;
  }

  public void initializePluggableDevicePlugins(Context context,
      Configuration configuration,
      Map<String, ResourcePlugin> pluginMap)
      throws YarnRuntimeException, ClassNotFoundException {
    LOG.info("The pluggable device framework enabled,"
        + "trying to load the vendor plugins");
    if (null == deviceMappingManager) {
      LOG.debug("DeviceMappingManager initialized.");
      deviceMappingManager = new DeviceMappingManager(context);
    }
    String[] pluginClassNames = configuration.getStrings(
        YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
    if (null == pluginClassNames) {
      throw new YarnRuntimeException("Null value found in configuration: "
          + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
    }

    for (String pluginClassName : pluginClassNames) {
      Class<?> pluginClazz = Class.forName(pluginClassName);
      if (!DevicePlugin.class.isAssignableFrom(pluginClazz)) {
        throw new YarnRuntimeException("Class: " + pluginClassName
            + " not instance of " + DevicePlugin.class.getCanonicalName());
      }
      // sanity-check before initialization
      checkInterfaceCompatibility(DevicePlugin.class, pluginClazz);

      DevicePlugin dpInstance =
          (DevicePlugin) ReflectionUtils.newInstance(
              pluginClazz, configuration);

      // Try to register plugin
      // TODO: handle the plugin method timeout issue
      DeviceRegisterRequest request = null;
      try {
        request = dpInstance.getRegisterRequestInfo();
      } catch (Exception e) {
        throw new YarnRuntimeException("Exception thrown from plugin's"
            + " getRegisterRequestInfo:"
            + e.getMessage());
      }
      String resourceName = request.getResourceName();
      // check if someone has already registered this resource type name
      if (pluginMap.containsKey(resourceName)) {
        throw new YarnRuntimeException(resourceName
            + " already registered! Please change resource type name"
            + " or configure correct resource type name"
            + " in resource-types.xml for "
            + pluginClassName);
      }
      // check resource name is valid and configured in resource-types.xml
      if (!isConfiguredResourceName(resourceName)) {
        throw new YarnRuntimeException(resourceName
            + " is not configured inside "
            + YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE
            + " , please configure it first");
      }
      LOG.info("New resource type: {} registered successfully by {}",
          resourceName,
          pluginClassName);
      DevicePluginAdapter pluginAdapter = new DevicePluginAdapter(
          resourceName, dpInstance, deviceMappingManager);
      LOG.info("Adapter of {} created. Initializing..", pluginClassName);
      try {
        pluginAdapter.initialize(context);
      } catch (YarnException e) {
        throw new YarnRuntimeException("Adapter of "
            + pluginClassName + " init failed!");
      }
      LOG.info("Adapter of {} init success!", pluginClassName);
      // Store plugin as adapter instance
      pluginMap.put(request.getResourceName(), pluginAdapter);
      // If the device plugin implements DevicePluginScheduler interface
      if (dpInstance instanceof DevicePluginScheduler) {
        // check DevicePluginScheduler interface compatibility
        checkInterfaceCompatibility(DevicePluginScheduler.class, pluginClazz);
        LOG.info(
            "{} can schedule {} devices."
                + "Added as preferred device plugin scheduler",
            pluginClassName,
            resourceName);
        deviceMappingManager.addDevicePluginScheduler(
            resourceName,
            (DevicePluginScheduler) dpInstance);
      }
    } // end for
  }

  @VisibleForTesting
  // Check if the implemented interfaces' signature is compatible
  public void checkInterfaceCompatibility(Class<?> expectedClass,
      Class<?> actualClass) throws YarnRuntimeException{
    LOG.debug("Checking implemented interface's compatibility: {}",
        expectedClass.getSimpleName());
    Method[] expectedDevicePluginMethods = expectedClass.getMethods();

    // Check method compatibility
    boolean found;
    for (Method method: expectedDevicePluginMethods) {
      found = false;
      LOG.debug("Try to find method: {}",
          method.getName());
      for (Method m : actualClass.getDeclaredMethods()) {
        if (m.getName().equals(method.getName())) {
          LOG.debug("Method {} found in class {}",
              m.getName(), actualClass.getSimpleName());
          found = true;
          break;
        }
      }
      if (!found) {
        LOG.error("Method {} is not found in plugin",
            method.getName());
        throw new YarnRuntimeException(
            "Method " + method.getName()
                + " is expected but not implemented in "
                + actualClass.getCanonicalName());
      }
    }// end for
    LOG.info("{} compatibility is ok.",
        expectedClass.getSimpleName());
  }

  @VisibleForTesting
  public boolean isConfiguredResourceName(String resourceName) {
    // check configured
    Map<String, ResourceInformation> configuredResourceTypes =
        ResourceUtils.getResourceTypes();
    if (!configuredResourceTypes.containsKey(resourceName)) {
      return false;
    }
    return true;
  }

  @VisibleForTesting
  public void setDeviceMappingManager(
      DeviceMappingManager deviceMappingManager) {
    this.deviceMappingManager = deviceMappingManager;
  }

  public DeviceMappingManager getDeviceMappingManager() {
    return deviceMappingManager;
  }

  public void cleanup() throws YarnException {
    for (ResourcePlugin plugin : configuredPlugins.values()) {
      plugin.cleanup();
    }
  }

  /**
   * Get resource name (such as gpu/fpga) to plugin references.
   * @return read-only map of resource name to plugins.
   */
  public synchronized Map<String, ResourcePlugin> getNameToPlugins() {
    return configuredPlugins;
  }
}