FederationCache.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.cache;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public abstract class FederationCache {
// ------------------------------------ Constants -------------------------
protected static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
protected static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
protected static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";
protected static final String POINT = ".";
private FederationStateStore stateStore;
/**
* Determine whether to enable cache.
* We judge whether to enable the cache according to the cache time.
* If the cache time is greater than 0, the cache is enabled.
* If the cache time is less than or equal 0, the cache is not enabled.
*
* @return true, enable cache; false, not enable cache.
*/
public abstract boolean isCachingEnabled();
/**
* Initialize the cache.
*
* @param pConf Configuration.
* @param pStateStore FederationStateStore.
*/
public abstract void initCache(Configuration pConf, FederationStateStore pStateStore);
/**
* clear cache.
*/
public abstract void clearCache();
/**
* Build CacheKey.
*
* @param className Cache Class Name.
* @param methodName Method Name.
* @return append result.
* Example: className:FederationJCache, methodName:getPoliciesConfigurations.
* We Will Return FederationJCache.getPoliciesConfigurations.
*/
protected String buildCacheKey(String className, String methodName) {
return buildCacheKey(className, methodName, null);
}
/**
* Build CacheKey.
*
* @param className Cache Class Name.
* @param methodName Method Name.
* @param argName ArgName.
* @return append result.
* Example:
* className:FederationJCache, methodName:getApplicationHomeSubCluster, argName: app_1
* We Will Return FederationJCache.getApplicationHomeSubCluster.app_1
*/
protected String buildCacheKey(String className, String methodName, String argName) {
StringBuilder buffer = new StringBuilder();
buffer.append(className).append(POINT).append(methodName);
if (argName != null) {
buffer.append(POINT);
buffer.append(argName);
}
return buffer.toString();
}
/**
* Returns the {@link SubClusterInfo} of all active sub cluster(s).
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters
* @return the information of all active sub cluster(s)
* @throws YarnException if the call to the state store is unsuccessful
*/
public abstract Map<SubClusterId, SubClusterInfo> getSubClusters(
boolean filterInactiveSubClusters) throws YarnException;
/**
* Get the policies that is represented as
* {@link SubClusterPolicyConfiguration} for all currently active queues in
* the system.
*
* @return the policies for all currently active queues in the system
* @throws YarnException if the call to the state store is unsuccessful
*/
public abstract Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
throws Exception;
/**
* Returns the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appId the identifier of the application
* @return the home sub cluster identifier
* @throws YarnException if the call to the state store is unsuccessful
*/
public abstract SubClusterId getApplicationHomeSubCluster(ApplicationId appId) throws Exception;
/**
* Remove SubCluster from cache.
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters.
*/
public abstract void removeSubCluster(boolean filterInactiveSubClusters);
// ------------------------------------ SubClustersCache -------------------------
/**
* Build GetSubClusters CacheRequest.
*
* @param cacheKey cacheKey.
* @param filterInactiveSubClusters filter Inactive SubClusters.
* @return CacheRequest.
* @throws YarnException exceptions from yarn servers.
*/
protected CacheRequest<String, CacheResponse<SubClusterInfo>> buildGetSubClustersCacheRequest(
String cacheKey, final boolean filterInactiveSubClusters) throws YarnException {
CacheResponse<SubClusterInfo> response =
buildSubClusterInfoResponse(filterInactiveSubClusters);
CacheRequest<String, CacheResponse<SubClusterInfo>> cacheRequest =
new CacheRequest<>(cacheKey, response);
return cacheRequest;
}
/**
* Build SubClusterInfo Response.
*
* @param filterInactiveSubClusters whether to filter out inactive sub-clusters.
* @return SubClusterInfo Response.
* @throws YarnException exceptions from yarn servers.
*/
private CacheResponse<SubClusterInfo> buildSubClusterInfoResponse(
final boolean filterInactiveSubClusters) throws YarnException {
GetSubClustersInfoRequest request = GetSubClustersInfoRequest.newInstance(
filterInactiveSubClusters);
GetSubClustersInfoResponse subClusters = stateStore.getSubClusters(request);
CacheResponse<SubClusterInfo> response = new SubClusterInfoCacheResponse();
response.setList(subClusters.getSubClusters());
return response;
}
/**
* According to the response, build SubClusterInfoMap.
*
* @param response GetSubClustersInfoResponse.
* @return SubClusterInfoMap.
*/
public static Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
final GetSubClustersInfoResponse response) {
List<SubClusterInfo> subClusters = response.getSubClusters();
return buildSubClusterInfoMap(subClusters);
}
/**
* According to the cacheRequest, build SubClusterInfoMap.
*
* @param cacheRequest CacheRequest.
* @return SubClusterInfoMap.
*/
public static Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
CacheRequest<String, ?> cacheRequest) {
Object value = cacheRequest.value;
SubClusterInfoCacheResponse response = SubClusterInfoCacheResponse.class.cast(value);
List<SubClusterInfo> subClusters = response.getList();
return buildSubClusterInfoMap(subClusters);
}
/**
* According to the subClusters, build SubClusterInfoMap.
*
* @param subClusters subCluster List.
* @return SubClusterInfoMap.
*/
private static Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
List<SubClusterInfo> subClusters) {
Map<SubClusterId, SubClusterInfo> subClustersMap = new HashMap<>(subClusters.size());
for (SubClusterInfo subCluster : subClusters) {
subClustersMap.put(subCluster.getSubClusterId(), subCluster);
}
return subClustersMap;
}
// ------------------------------------ ApplicationHomeSubClusterCache -------------------------
/**
* Build GetApplicationHomeSubCluster CacheRequest.
*
* @param cacheKey cacheKey.
* @param applicationId applicationId.
* @return CacheRequest.
* @throws YarnException exceptions from yarn servers.
*/
protected CacheRequest<String, CacheResponse<SubClusterId>>
buildGetApplicationHomeSubClusterRequest(String cacheKey, ApplicationId applicationId)
throws YarnException {
CacheResponse<SubClusterId> response = buildSubClusterIdResponse(applicationId);
return new CacheRequest<>(cacheKey, response);
}
/**
* Build SubClusterId Response.
*
* @param applicationId applicationId.
* @return subClusterId
* @throws YarnException exceptions from yarn servers.
*/
private CacheResponse<SubClusterId> buildSubClusterIdResponse(final ApplicationId applicationId)
throws YarnException {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();
CacheResponse<SubClusterId> cacheResponse = new ApplicationHomeSubClusterCacheResponse();
cacheResponse.setItem(subClusterId);
return cacheResponse;
}
// ------------------------------ SubClusterPolicyConfigurationCache -------------------------
/**
* Build GetPoliciesConfigurations CacheRequest.
*
* @param cacheKey cacheKey.
* @return CacheRequest.
* @throws YarnException exceptions from yarn servers.
*/
protected CacheRequest<String, CacheResponse<SubClusterPolicyConfiguration>>
buildGetPoliciesConfigurationsCacheRequest(String cacheKey) throws YarnException {
CacheResponse<SubClusterPolicyConfiguration> response =
buildSubClusterPolicyConfigurationResponse();
return new CacheRequest<>(cacheKey, response);
}
/**
* According to the response, build PolicyConfigMap.
*
* @param response GetSubClusterPoliciesConfigurationsResponse.
* @return PolicyConfigMap.
*/
public static Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
GetSubClusterPoliciesConfigurationsResponse response) {
List<SubClusterPolicyConfiguration> policyConfigs = response.getPoliciesConfigs();
return buildPolicyConfigMap(policyConfigs);
}
/**
* According to the subClusters, build PolicyConfigMap.
*
* @param policyConfigs SubClusterPolicyConfigurations
* @return PolicyConfigMap.
*/
private static Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
List<SubClusterPolicyConfiguration> policyConfigs) {
Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs = new HashMap<>();
for (SubClusterPolicyConfiguration policyConfig : policyConfigs) {
queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig);
}
return queuePolicyConfigs;
}
/**
* According to the cacheRequest, build PolicyConfigMap.
*
* @param cacheRequest CacheRequest.
* @return PolicyConfigMap.
*/
public static Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
CacheRequest<String, ?> cacheRequest){
Object value = cacheRequest.value;
SubClusterPolicyConfigurationCacheResponse response =
SubClusterPolicyConfigurationCacheResponse.class.cast(value);
List<SubClusterPolicyConfiguration> subClusters = response.getList();
return buildPolicyConfigMap(subClusters);
}
/**
* Build SubClusterPolicyConfiguration Response.
*
* @return SubClusterPolicyConfiguration Response.
* @throws YarnException exceptions from yarn servers.
*/
private CacheResponse<SubClusterPolicyConfiguration> buildSubClusterPolicyConfigurationResponse()
throws YarnException {
GetSubClusterPoliciesConfigurationsRequest request =
GetSubClusterPoliciesConfigurationsRequest.newInstance();
GetSubClusterPoliciesConfigurationsResponse response =
stateStore.getPoliciesConfigurations(request);
List<SubClusterPolicyConfiguration> policyConfigs = response.getPoliciesConfigs();
CacheResponse<SubClusterPolicyConfiguration> cacheResponse =
new SubClusterPolicyConfigurationCacheResponse();
cacheResponse.setList(policyConfigs);
return cacheResponse;
}
/**
* Internal class that encapsulates the cache key and a function that returns
* the value for the specified key.
*/
public class CacheRequest<K, V> {
private K key;
private V value;
CacheRequest(K pKey, V pValue) {
this.key = pKey;
this.value = pValue;
}
public V getValue() throws Exception {
return value;
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(key).toHashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (obj instanceof CacheRequest) {
Class<CacheRequest> cacheRequestClass = CacheRequest.class;
CacheRequest other = cacheRequestClass.cast(obj);
return new EqualsBuilder().append(key, other.key).isEquals();
}
return false;
}
}
public class CacheResponse<R> {
private List<R> list;
private R item;
public List<R> getList() {
return list;
}
public void setList(List<R> list) {
this.list = list;
}
public R getItem() {
return item;
}
public void setItem(R pItem) {
this.item = pItem;
}
}
public class SubClusterInfoCacheResponse extends CacheResponse<SubClusterInfo> {
@Override
public List<SubClusterInfo> getList() {
return super.getList();
}
@Override
public void setList(List<SubClusterInfo> list) {
super.setList(list);
}
@Override
public SubClusterInfo getItem() {
return super.getItem();
}
@Override
public void setItem(SubClusterInfo item) {
super.setItem(item);
}
}
public class SubClusterPolicyConfigurationCacheResponse
extends CacheResponse<SubClusterPolicyConfiguration> {
@Override
public List<SubClusterPolicyConfiguration> getList() {
return super.getList();
}
@Override
public void setList(List<SubClusterPolicyConfiguration> list) {
super.setList(list);
}
@Override
public SubClusterPolicyConfiguration getItem() {
return super.getItem();
}
@Override
public void setItem(SubClusterPolicyConfiguration item) {
super.setItem(item);
}
}
public class ApplicationHomeSubClusterCacheResponse
extends CacheResponse<SubClusterId> {
@Override
public List<SubClusterId> getList() {
return super.getList();
}
@Override
public void setList(List<SubClusterId> list) {
super.setList(list);
}
@Override
public SubClusterId getItem() {
return super.getItem();
}
@Override
public void setItem(SubClusterId item) {
super.setItem(item);
}
}
public FederationStateStore getStateStore() {
return stateStore;
}
public void setStateStore(FederationStateStore stateStore) {
this.stateStore = stateStore;
}
}