FederationStateStoreService.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.federation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
/**
* Implements {@link FederationStateStore} and provides a service for
* participating in the federation membership.
*/
public class FederationStateStoreService extends AbstractService
implements FederationStateStore {
public static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreService.class);
private Configuration config;
private ScheduledExecutorService scheduledExecutorService;
private FederationStateStoreHeartbeat stateStoreHeartbeat;
private FederationStateStore stateStoreClient = null;
private SubClusterId subClusterId;
private long heartbeatInterval;
private long heartbeatInitialDelay;
private RMContext rmContext;
private final Clock clock = new MonotonicClock();
private FederationStateStoreServiceMetrics metrics;
private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread";
private int cleanUpRetryCountNum;
private long cleanUpRetrySleepTime;
public FederationStateStoreService(RMContext rmContext) {
super(FederationStateStoreService.class.getName());
LOG.info("FederationStateStoreService initialized");
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.config = conf;
RetryPolicy retryPolicy =
FederationStateStoreFacade.createRetryPolicy(conf);
this.stateStoreClient =
(FederationStateStore) FederationStateStoreFacade.createRetryInstance(
conf, YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
FederationStateStore.class, retryPolicy);
this.stateStoreClient.init(conf);
LOG.info("Initialized state store client class");
this.subClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
heartbeatInterval = conf.getLong(
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
if (heartbeatInterval <= 0) {
heartbeatInterval =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
}
heartbeatInitialDelay = conf.getTimeDuration(
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
TimeUnit.SECONDS);
if (heartbeatInitialDelay <= 0) {
LOG.warn("{} configured value is wrong, must be > 0; using default value of {}",
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY);
heartbeatInitialDelay =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY;
}
cleanUpRetryCountNum = conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT);
cleanUpRetrySleepTime = conf.getTimeDuration(
YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
TimeUnit.MILLISECONDS);
LOG.info("Initialized federation membership service.");
this.metrics = FederationStateStoreServiceMetrics.getMetrics();
LOG.info("Initialized federation statestore service metrics.");
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
registerAndInitializeHeartbeat();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
Exception ex = null;
try {
if (this.scheduledExecutorService != null
&& !this.scheduledExecutorService.isShutdown()) {
this.scheduledExecutorService.shutdown();
LOG.info("Stopped federation membership heartbeat");
}
} catch (Exception e) {
LOG.error("Failed to shutdown ScheduledExecutorService", e);
ex = e;
}
if (this.stateStoreClient != null) {
try {
deregisterSubCluster(SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
} finally {
this.stateStoreClient.close();
}
}
if (ex != null) {
throw ex;
}
}
// Return a client accessible string representation of the service address.
private String getServiceAddress(InetSocketAddress address) {
InetSocketAddress socketAddress = NetUtils.getConnectAddress(address);
return socketAddress.getAddress().getHostAddress() + ":"
+ socketAddress.getPort();
}
private void registerAndInitializeHeartbeat() {
String clientRMAddress =
getServiceAddress(rmContext.getClientRMService().getBindAddress());
String amRMAddress = getServiceAddress(
rmContext.getApplicationMasterService().getBindAddress());
String rmAdminAddress = getServiceAddress(
config.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
String webAppAddress = getServiceAddress(NetUtils
.createSocketAddr(WebAppUtils.getRMWebAppURLWithScheme(config)));
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
SubClusterState.SC_NEW, ResourceManager.getClusterTimeStamp(), "");
try {
registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo));
LOG.info("Successfully registered for federation subcluster: {}",
subClusterInfo);
} catch (Exception e) {
throw new YarnRuntimeException(
"Failed to register Federation membership with the StateStore", e);
}
stateStoreHeartbeat = new FederationStateStoreHeartbeat(subClusterId,
stateStoreClient, rmContext.getScheduler());
scheduledExecutorService =
HadoopExecutors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS);
LOG.info("Started federation membership heartbeat with interval: {} and initial delay: {}",
heartbeatInterval, heartbeatInitialDelay);
}
@VisibleForTesting
public FederationStateStore getStateStoreClient() {
return stateStoreClient;
}
@VisibleForTesting
public FederationStateStoreHeartbeat getStateStoreHeartbeatThread() {
return stateStoreHeartbeat;
}
@Override
public Version getCurrentVersion() {
return stateStoreClient.getCurrentVersion();
}
@Override
public Version loadVersion() throws Exception {
return stateStoreClient.getCurrentVersion();
}
@Override
public void storeVersion() throws Exception {
stateStoreClient.storeVersion();
}
@Override
public void checkVersion() throws Exception {
stateStoreClient.checkVersion();
}
@Override
public void deleteStateStore() throws Exception {
stateStoreClient.deleteStateStore();
}
@Override
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
FederationClientMethod<GetSubClusterPolicyConfigurationResponse> clientMethod =
new FederationClientMethod<>("getPolicyConfiguration",
GetSubClusterPolicyConfigurationRequest.class, request,
GetSubClusterPolicyConfigurationResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
FederationClientMethod<SetSubClusterPolicyConfigurationResponse> clientMethod =
new FederationClientMethod<>("setPolicyConfiguration",
SetSubClusterPolicyConfigurationRequest.class, request,
SetSubClusterPolicyConfigurationResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
FederationClientMethod<GetSubClusterPoliciesConfigurationsResponse> clientMethod =
new FederationClientMethod<>("getPoliciesConfigurations",
GetSubClusterPoliciesConfigurationsRequest.class, request,
GetSubClusterPoliciesConfigurationsResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public DeleteSubClusterPoliciesConfigurationsResponse deletePoliciesConfigurations(
DeleteSubClusterPoliciesConfigurationsRequest request) throws YarnException {
FederationClientMethod<DeleteSubClusterPoliciesConfigurationsResponse> clientMethod =
new FederationClientMethod<>("deletePoliciesConfigurations",
DeleteSubClusterPoliciesConfigurationsRequest.class, request,
DeleteSubClusterPoliciesConfigurationsResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations(
DeletePoliciesConfigurationsRequest request) throws Exception {
FederationClientMethod<DeletePoliciesConfigurationsResponse> clientMethod =
new FederationClientMethod<>("deleteAllPoliciesConfigurations",
DeletePoliciesConfigurationsRequest.class, request,
DeletePoliciesConfigurationsResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request)
throws YarnException {
FederationClientMethod<SubClusterRegisterResponse> clientMethod =
new FederationClientMethod<>("registerSubCluster",
SubClusterRegisterRequest.class, request,
SubClusterRegisterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request)
throws YarnException {
FederationClientMethod<SubClusterDeregisterResponse> clientMethod =
new FederationClientMethod<>("deregisterSubCluster",
SubClusterDeregisterRequest.class, request,
SubClusterDeregisterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request)
throws YarnException {
FederationClientMethod<SubClusterHeartbeatResponse> clientMethod =
new FederationClientMethod<>("subClusterHeartbeat",
SubClusterHeartbeatRequest.class, request,
SubClusterHeartbeatResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
throws YarnException {
FederationClientMethod<GetSubClusterInfoResponse> clientMethod =
new FederationClientMethod<>("getSubCluster",
GetSubClusterInfoRequest.class, request,
GetSubClusterInfoResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request)
throws YarnException {
FederationClientMethod<GetSubClustersInfoResponse> clientMethod =
new FederationClientMethod<>("getSubClusters",
GetSubClustersInfoRequest.class, request,
GetSubClustersInfoResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<AddApplicationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("addApplicationHomeSubCluster",
AddApplicationHomeSubClusterRequest.class, request,
AddApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<UpdateApplicationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("updateApplicationHomeSubCluster",
AddApplicationHomeSubClusterRequest.class, request,
UpdateApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<GetApplicationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("getApplicationHomeSubCluster",
GetApplicationHomeSubClusterRequest.class, request,
GetApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<GetApplicationsHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("getApplicationsHomeSubCluster",
GetApplicationsHomeSubClusterRequest.class, request,
GetApplicationsHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<DeleteApplicationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("deleteApplicationHomeSubCluster",
DeleteApplicationHomeSubClusterRequest.class, request,
DeleteApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<AddReservationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("addReservationHomeSubCluster",
AddReservationHomeSubClusterRequest.class, request,
AddReservationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<GetReservationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("getReservationHomeSubCluster",
GetReservationHomeSubClusterRequest.class, request,
GetReservationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<GetReservationsHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("getReservationsHomeSubCluster",
GetReservationsHomeSubClusterRequest.class, request,
GetReservationsHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<UpdateReservationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("updateReservationHomeSubCluster",
GetReservationsHomeSubClusterRequest.class, request,
UpdateReservationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
FederationClientMethod<DeleteReservationHomeSubClusterResponse> clientMethod =
new FederationClientMethod<>("deleteReservationHomeSubCluster",
DeleteReservationHomeSubClusterRequest.class, request,
DeleteReservationHomeSubClusterResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
FederationClientMethod<RouterMasterKeyResponse> clientMethod = new FederationClientMethod<>(
"storeNewMasterKey",
RouterMasterKeyRequest.class, request,
RouterMasterKeyResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
FederationClientMethod<RouterMasterKeyResponse> clientMethod = new FederationClientMethod<>(
"removeStoredMasterKey",
RouterMasterKeyRequest.class, request,
RouterMasterKeyResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
FederationClientMethod<RouterMasterKeyResponse> clientMethod = new FederationClientMethod<>(
"getMasterKeyByDelegationKey",
RouterMasterKeyRequest.class, request,
RouterMasterKeyResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
"storeNewToken",
RouterRMTokenRequest.class, request,
RouterRMTokenResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
"updateStoredToken",
RouterRMTokenRequest.class, request,
RouterRMTokenResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
"removeStoredToken",
RouterRMTokenRequest.class, request,
RouterRMTokenResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException {
FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
"getTokenByRouterStoreToken",
RouterRMTokenRequest.class, request,
RouterRMTokenResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public int incrementDelegationTokenSeqNum() {
return stateStoreClient.incrementDelegationTokenSeqNum();
}
@Override
public int getDelegationTokenSeqNum() {
return stateStoreClient.getDelegationTokenSeqNum();
}
@Override
public void setDelegationTokenSeqNum(int seqNum) {
stateStoreClient.setDelegationTokenSeqNum(seqNum);
}
@Override
public int getCurrentKeyId() {
return stateStoreClient.getCurrentKeyId();
}
@Override
public int incrementCurrentKeyId() {
return stateStoreClient.incrementCurrentKeyId();
}
/**
* Create a thread that cleans up the app.
* @param stage rm-start/rm-stop.
*/
public void createCleanUpFinishApplicationThread(String stage) {
String threadName = cleanUpThreadNamePrefix + "-" + stage;
Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread());
finishApplicationThread.setName(threadName);
finishApplicationThread.start();
LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName);
}
/**
* Create a thread that cleans up the apps.
*
* @return thread object.
*/
private Runnable createCleanUpFinishApplicationThread() {
return () -> {
createCleanUpFinishApplication();
};
}
/**
* cleans up the apps.
*/
private void createCleanUpFinishApplication() {
try {
// Get the current RM's App list based on subClusterId
GetApplicationsHomeSubClusterRequest request =
GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
GetApplicationsHomeSubClusterResponse response =
getApplicationsHomeSubCluster(request);
List<ApplicationHomeSubCluster> applicationHomeSCs = response.getAppsHomeSubClusters();
// Traverse the app list and clean up the app.
long successCleanUpAppCount = 0;
// Save a local copy of the map so that it won't change with the map
Map<ApplicationId, RMApp> rmApps = new HashMap<>(this.rmContext.getRMApps());
// Need to make sure there is app list in RM memory.
if (rmApps != null && !rmApps.isEmpty()) {
for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) {
ApplicationId applicationId = applicationHomeSC.getApplicationId();
if (!rmApps.containsKey(applicationId)) {
try {
Boolean cleanUpSuccess = cleanUpFinishApplicationsWithRetries(applicationId, false);
if (cleanUpSuccess) {
LOG.info("application = {} has been cleaned up successfully.", applicationId);
successCleanUpAppCount++;
}
} catch (Exception e) {
LOG.error("problem during application = {} cleanup.", applicationId, e);
}
}
}
}
// print app cleanup log
LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.",
applicationHomeSCs.size(), successCleanUpAppCount);
} catch (Exception e) {
LOG.error("problem during cleanup applications.", e);
}
}
/**
* Clean up the federation completed Application.
*
* @param appId app id.
* @param isQuery true, need to query from statestore, false not query.
* @throws Exception exception occurs.
* @return true, successfully deleted; false, failed to delete or no need to delete
*/
public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean isQuery)
throws Exception {
// Generate a request to delete data
DeleteApplicationHomeSubClusterRequest req =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
// CleanUp Finish App.
return ((FederationActionRetry<Boolean>) (retry) -> invokeCleanUpFinishApp(appId, isQuery, req))
.runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime);
}
/**
* CleanUp Finish App.
*
* @param applicationId app id.
* @param isQuery true, need to query from statestore, false not query.
* @param delRequest delete Application Request
* @return true, successfully deleted; false, failed to delete or no need to delete
* @throws YarnException
*/
private boolean invokeCleanUpFinishApp(ApplicationId applicationId, boolean isQuery,
DeleteApplicationHomeSubClusterRequest delRequest) throws YarnException {
boolean isAppNeedClean = true;
// If we need to query the StateStore
if (isQuery) {
isAppNeedClean = isApplicationNeedClean(applicationId);
}
// When the App needs to be cleaned up, clean up the App.
if (isAppNeedClean) {
DeleteApplicationHomeSubClusterResponse response =
deleteApplicationHomeSubCluster(delRequest);
if (response != null) {
LOG.info("The applicationId = {} has been successfully cleaned up.", applicationId);
return true;
}
}
return false;
}
/**
* Used to determine whether the Application is cleaned up.
*
* When the app in the RM is completed,
* the HomeSC corresponding to the app will be queried in the StateStore.
* If the current RM is the HomeSC, the completed app will be cleaned up.
*
* @param applicationId applicationId
* @return true, app needs to be cleaned up;
* false, app doesn't need to be cleaned up.
*/
private boolean isApplicationNeedClean(ApplicationId applicationId) {
GetApplicationHomeSubClusterRequest queryRequest =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
// Here we need to use try...catch,
// because getApplicationHomeSubCluster may throw not exist exception
try {
GetApplicationHomeSubClusterResponse queryResp =
getApplicationHomeSubCluster(queryRequest);
if (queryResp != null) {
ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster();
SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster();
if (!subClusterId.equals(homeSubClusterId)) {
LOG.warn("The homeSubCluster of applicationId = {} belong subCluster = {}, " +
" not belong subCluster = {} and is not allowed to delete.",
applicationId, homeSubClusterId, subClusterId);
return false;
}
} else {
LOG.warn("The applicationId = {} not belong subCluster = {} " +
" and is not allowed to delete.", applicationId, subClusterId);
return false;
}
} catch (Exception e) {
LOG.warn("query applicationId = {} error.", applicationId, e);
return false;
}
return true;
}
}