IntelFpgaOpenclPlugin.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
/**
* Intel FPGA for OpenCL plugin.
* The key points are:
* 1. It uses Intel's toolchain "aocl" to discover devices/reprogram IP
* to the device before container launch to achieve a quickest
* reprogramming path
* 2. It avoids reprogramming by maintaining a mapping of device to FPGA IP ID
* 3. It assume IP file is distributed to container directory
*/
public class IntelFpgaOpenclPlugin implements AbstractFpgaVendorPlugin {
private static final Logger LOG = LoggerFactory.getLogger(
IntelFpgaOpenclPlugin.class);
private boolean initialized = false;
private InnerShellExecutor shell;
private static final String DEFAULT_BINARY_NAME = "aocl";
private static final String ALTERAOCLSDKROOT_NAME = "ALTERAOCLSDKROOT";
private Function<String, String> envProvider = System::getenv;
private String pathToExecutable = null;
@VisibleForTesting
void setInnerShellExecutor(InnerShellExecutor shellExecutor) {
this.shell = shellExecutor;
}
@VisibleForTesting
String getPathToExecutable() {
return pathToExecutable;
}
@VisibleForTesting
void setEnvProvider(Function<String, String> envProvider) {
this.envProvider = envProvider;
}
public IntelFpgaOpenclPlugin() {
this.shell = new InnerShellExecutor();
}
public String getDefaultPathToExecutable() {
return envProvider.apply(ALTERAOCLSDKROOT_NAME);
}
/**
* Check the Intel FPGA for OpenCL toolchain.
* */
@Override
public boolean initPlugin(Configuration config) {
if (initialized) {
return true;
}
// Find the proper toolchain, mainly aocl
String pluginDefaultBinaryName = DEFAULT_BINARY_NAME;
String executable = config.get(YarnConfiguration.NM_FPGA_PATH_TO_EXEC,
pluginDefaultBinaryName);
// Validate file existence
File binaryPath = new File(executable);
if (!binaryPath.exists()) {
// When binary not exist, fail
LOG.warn("Failed to find FPGA discoverer executable configured in " +
YarnConfiguration.NM_FPGA_PATH_TO_EXEC +
", please check! Try default path");
executable = pluginDefaultBinaryName;
// Try to find in plugin's preferred path
String pluginDefaultPreferredPath = getDefaultPathToExecutable();
if (null == pluginDefaultPreferredPath) {
LOG.warn("Failed to find FPGA discoverer executable from system "
+ " environment " + ALTERAOCLSDKROOT_NAME +
", please check your environment!");
} else {
binaryPath = new File(pluginDefaultPreferredPath + "/bin",
pluginDefaultBinaryName);
if (binaryPath.exists()) {
executable = binaryPath.getAbsolutePath();
LOG.info("Succeed in finding FPGA discoverer executable: " +
executable);
} else {
executable = pluginDefaultBinaryName;
LOG.warn("Failed to find FPGA discoverer executable in " +
pluginDefaultPreferredPath +
", file doesn't exists! Use default binary" + executable);
}
}
}
pathToExecutable = executable;
if (!diagnose(10*1000)) {
LOG.warn("Intel FPGA for OpenCL diagnose failed!");
initialized = false;
} else {
initialized = true;
}
return initialized;
}
@Override
public List<FpgaDevice> discover(int timeout) {
List<FpgaDevice> list = new LinkedList<>();
String output;
output = getDiagnoseInfo(timeout);
if (null == output) {
return list;
}
list = AoclDiagnosticOutputParser.parseDiagnosticOutput(output,
shell, getFpgaType());
return list;
}
/**
* Helper class to run aocl diagnose & determine major/minor numbers.
*/
public static class InnerShellExecutor {
// ls /dev/<devName>
// return a string in format <major:minor>
public String getMajorAndMinorNumber(String devName) {
String output = null;
Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
new String[]{"stat", "-c", "%t:%T", "/dev/" + devName});
try {
LOG.debug("Get FPGA major-minor numbers from /dev/{}", devName);
shexec.execute();
String[] strs = shexec.getOutput().trim().split(":");
LOG.debug("stat output:{}", shexec.getOutput());
output = Integer.parseInt(strs[0], 16) + ":" +
Integer.parseInt(strs[1], 16);
} catch (IOException e) {
LOG.warn("Failed to get major-minor number from reading /dev/" +
devName);
LOG.warn("Command output:" + shexec.getOutput() + ", exit code: " +
shexec.getExitCode(), e);
}
return output;
}
public String runDiagnose(String binary, int timeout) {
String output = null;
Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
new String[]{binary, "diagnose"}, null, null, timeout);
try {
shexec.execute();
} catch (IOException e) {
// aocl diagnose exit code is 1 even it success.
// we ignore it because we only wants the output
String msg =
"Failed to execute " + binary + " diagnose, exception message:" + e
.getMessage() +", output:" + output + ", continue ...";
LOG.warn(msg);
LOG.debug("{}", shexec.getOutput());
}
return shexec.getOutput();
}
}
public String getDiagnoseInfo(int timeout) {
return this.shell.runDiagnose(this.pathToExecutable,timeout);
}
@Override
public boolean diagnose(int timeout) {
String output = getDiagnoseInfo(timeout);
if (null != output && output.contains("DIAGNOSTIC_PASSED")) {
return true;
}
return false;
}
/**
* this is actually the opencl platform type
* */
@Override
public String getFpgaType() {
return "IntelOpenCL";
}
@Override
public String retrieveIPfilePath(String id, String dstDir,
Map<Path, List<String>> localizedResources) {
// Assume .aocx IP file is distributed by DS to local dir
String ipFilePath = null;
LOG.info("Got environment: " + id +
", search IP file in localized resources");
if (null == id || id.isEmpty()) {
LOG.warn("IP_ID environment is empty, skip downloading");
return null;
}
if (localizedResources != null) {
Optional<Path> aocxPath = localizedResources
.keySet()
.stream()
.filter(path -> matchesIpid(path, id))
.findFirst();
if (aocxPath.isPresent()) {
ipFilePath = aocxPath.get().toString();
LOG.info("Found: {}", ipFilePath);
} else {
LOG.warn("Requested IP file not found");
}
} else {
LOG.warn("Localized resource is null!");
}
return ipFilePath;
}
private boolean matchesIpid(Path p, String id) {
return p.getName().toLowerCase().equals(id.toLowerCase() + ".aocx");
}
/**
* Program one device.
* It's ok for the offline "aocl program" failed because the application will
* always invoke API to program.
* The reason we do offline reprogramming is to make the application's
* program process faster.
* @param ipPath the absolute path to the aocx IP file
* @param device Fpga device object which represents the card
* @return false if programming the card fails
* */
@Override
public boolean configureIP(String ipPath, FpgaDevice device) {
// perform offline program the IP to get a quickest reprogramming sequence
// we need a mapping of "major:minor" to "acl0" to
// issue command "aocl program <acl0> <ipPath>"
Shell.ShellCommandExecutor shexec;
String aclName;
aclName = device.getAliasDevName();
shexec = new Shell.ShellCommandExecutor(
new String[]{this.pathToExecutable, "program", aclName, ipPath});
try {
shexec.execute();
if (0 == shexec.getExitCode()) {
LOG.debug("{}", shexec.getOutput());
LOG.info("Intel aocl program " + ipPath + " to " +
aclName + " successfully");
} else {
LOG.error("Device programming failed, aocl output is:");
LOG.error(shexec.getOutput());
return false;
}
} catch (IOException e) {
LOG.error("Intel aocl program " + ipPath + " to " +
aclName + " failed!", e);
LOG.error("Aocl output: " + shexec.getOutput());
return false;
}
return true;
}
}