VEDeviceDiscoverer.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
class VEDeviceDiscoverer {
private static final String STATE_TERMINATING = "TERMINATING";
private static final String STATE_INITIALIZING = "INITIALIZING";
private static final String STATE_OFFLINE = "OFFLINE";
private static final String STATE_ONLINE = "ONLINE";
private static final Logger LOG =
LoggerFactory.getLogger(VEDeviceDiscoverer.class);
private static final String[] DEVICE_STATE = {STATE_ONLINE, STATE_OFFLINE,
STATE_INITIALIZING, STATE_TERMINATING};
private UdevUtil udev;
private Function<String[], CommandExecutor>
commandExecutorProvider = this::createCommandExecutor;
VEDeviceDiscoverer(UdevUtil udevUtil) {
udev = udevUtil;
}
public Set<Device> getDevicesFromPath(String path) throws IOException {
MutableInt counter = new MutableInt(0);
try (Stream<Path> stream = Files.walk(Paths.get(path), 1)) {
return stream.filter(p -> p.toFile().getName().startsWith("veslot"))
.map(p -> toDevice(p, counter))
.collect(Collectors.toSet());
}
}
private Device toDevice(Path p, MutableInt counter) {
CommandExecutor executor =
commandExecutorProvider.apply(
new String[]{"stat", "-L", "-c", "%t:%T:%F", p.toString()});
try {
LOG.info("Checking device file: {}", p);
executor.execute();
String statOutput = executor.getOutput();
String[] stat = statOutput.trim().split(":");
int major = Integer.parseInt(stat[0], 16);
int minor = Integer.parseInt(stat[1], 16);
char devType = getDevType(p, stat[2]);
int deviceNumber = makeDev(major, minor);
LOG.info("Device: major: {}, minor: {}, devNo: {}, type: {}",
major, minor, deviceNumber, devType);
String sysPath = udev.getSysPath(deviceNumber, devType);
LOG.info("Device syspath: {}", sysPath);
String deviceState = getDeviceState(sysPath);
Device.Builder builder = Device.Builder.newInstance();
builder.setId(counter.getAndIncrement())
.setMajorNumber(major)
.setMinorNumber(minor)
.setHealthy(STATE_ONLINE.equalsIgnoreCase(deviceState))
.setStatus(deviceState)
.setDevPath(p.toAbsolutePath().toString());
return builder.build();
} catch (IOException e) {
throw new UncheckedIOException("Cannot execute stat command", e);
}
}
private int makeDev(int major, int minor) {
return major * 256 + minor;
}
private char getDevType(Path p, String fromStat) {
if (fromStat.contains("character")) {
return 'c';
} else if (fromStat.contains("block")) {
return 'b';
} else {
throw new IllegalArgumentException(
"File is neither a char nor block device: " + p);
}
}
private String getDeviceState(String sysPath) throws IOException {
Path statePath = Paths.get(sysPath, "os_state");
try (FileInputStream fis =
new FileInputStream(statePath.toString())) {
byte state = (byte) fis.read();
if (state < 0 || DEVICE_STATE.length <= state) {
return String.format("Unknown (%d)", state);
} else {
return DEVICE_STATE[state];
}
}
}
private CommandExecutor createCommandExecutor(String[] command) {
return new Shell.ShellCommandExecutor(
command);
}
@VisibleForTesting
void setCommandExecutorProvider(
Function<String[], CommandExecutor> provider) {
this.commandExecutorProvider = provider;
}
}