RMAdminProtocolMethod.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationMethodWrapper;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Class to define admin method, params and arguments.
*/
public class RMAdminProtocolMethod extends FederationMethodWrapper {
private static final Logger LOG =
LoggerFactory.getLogger(RMAdminProtocolMethod.class);
private FederationStateStoreFacade federationFacade;
private FederationRMAdminInterceptor rmAdminInterceptor;
private Configuration configuration;
public RMAdminProtocolMethod(Class<?>[] pTypes, Object... pParams)
throws IOException {
super(pTypes, pParams);
}
public <R> Collection<R> invokeConcurrent(FederationRMAdminInterceptor interceptor,
Class<R> clazz, String subClusterId) throws YarnException {
this.rmAdminInterceptor = interceptor;
this.federationFacade = FederationStateStoreFacade.getInstance(interceptor.getConf());
this.configuration = interceptor.getConf();
if (StringUtils.isNotBlank(subClusterId)) {
return invoke(clazz, subClusterId);
} else {
return invokeConcurrent(clazz);
}
}
@Override
protected <R> Collection<R> invokeConcurrent(Class<R> clazz) throws YarnException {
String methodName = Thread.currentThread().getStackTrace()[3].getMethodName();
this.setMethodName(methodName);
ThreadPoolExecutor executorService = rmAdminInterceptor.getExecutorService();
// Get Active SubClusters
Map<SubClusterId, SubClusterInfo> subClusterInfo =
federationFacade.getSubClusters(true);
Collection<SubClusterId> subClusterIds = subClusterInfo.keySet();
List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
Map<SubClusterId, Exception> exceptions = new TreeMap<>();
// Generate parallel Callable tasks
for (SubClusterId subClusterId : subClusterIds) {
callables.add(() -> {
ResourceManagerAdministrationProtocol protocol =
rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterId);
Class<?>[] types = this.getTypes();
Object[] params = this.getParams();
Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types);
Object result = method.invoke(protocol, params);
return Pair.of(subClusterId, result);
});
}
// Get results from multiple threads
Map<SubClusterId, R> results = new TreeMap<>();
try {
futures.addAll(executorService.invokeAll(callables));
futures.stream().forEach(future -> {
SubClusterId subClusterId = null;
try {
Pair<SubClusterId, Object> pair = future.get();
subClusterId = pair.getKey();
Object result = pair.getValue();
if (result != null) {
R rResult = clazz.cast(result);
results.put(subClusterId, rResult);
}
} catch (InterruptedException | ExecutionException e) {
Throwable cause = e.getCause();
LOG.error("Cannot execute {} on {}: {}", methodName, subClusterId, cause.getMessage());
exceptions.put(subClusterId, e);
}
});
} catch (InterruptedException e) {
throw new YarnException("invokeConcurrent Failed.", e);
}
// All sub-clusters return results to be considered successful,
// otherwise an exception will be thrown.
if (exceptions != null && !exceptions.isEmpty()) {
Set<SubClusterId> subClusterIdSets = exceptions.keySet();
throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
StringUtils.join(subClusterIdSets, ","));
}
// return result
return results.values();
}
/**
* Call the method in the protocol according to the subClusterId.
*
* @param clazz return type
* @param subClusterId subCluster Id
* @param <R> Generic R
* @return response collection.
* @throws YarnException yarn exception.
*/
protected <R> Collection<R> invoke(Class<R> clazz, String subClusterId) throws YarnException {
// Get the method name to call
String methodName = Thread.currentThread().getStackTrace()[3].getMethodName();
this.setMethodName(methodName);
// Get Active SubClusters
Map<SubClusterId, SubClusterInfo> subClusterInfoMap =
federationFacade.getSubClusters(true);
// According to subCluster of string type, convert to SubClusterId type
SubClusterId subClusterIdKey = SubClusterId.newInstance(subClusterId);
// If the provided subCluster is not Active or does not exist,
// an exception will be returned directly.
if (!subClusterInfoMap.containsKey(subClusterIdKey)) {
throw new YarnException("subClusterId = " + subClusterId + " is not an active subCluster.");
}
// Call the method in the protocol and convert it according to clazz.
try {
ResourceManagerAdministrationProtocol protocol =
rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterIdKey);
Class<?>[] types = this.getTypes();
Object[] params = this.getParams();
Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types);
Object result = method.invoke(protocol, params);
if (result != null) {
return Collections.singletonList(clazz.cast(result));
}
} catch (Exception e) {
throw new YarnException("invoke Failed, An exception occurred in subClusterId = " +
subClusterId, e);
}
throw new YarnException("invoke Failed, An exception occurred in subClusterId = " +
subClusterId);
}
}