TestFederationClientInterceptor.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
* {@code FederationInterceptor} class. The tests for
* {@code RouterClientRMService} has been written cleverly so that it can be
* reused to validate different request interceptor chains.
*/
public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationClientInterceptor.class);
private TestableFederationClientInterceptor interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private List<SubClusterId> subClusters;
private String user = "test-user";
private final static int NUM_SUBCLUSTER = 4;
private final static int APP_PRIORITY_ZERO = 0;
private final static long DEFAULT_DURATION = 10 * 60 * 1000;
@BeforeEach
@Override
public void setUp() throws IOException {
super.setUpConfig();
interceptor = new TestableFederationClientInterceptor();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance(getConf()).reinitialize(stateStore, getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
RouterDelegationTokenSecretManager tokenSecretManager =
interceptor.createRouterRMDelegationTokenSecretManager(this.getConf());
tokenSecretManager.startThreads();
interceptor.setTokenSecretManager(tokenSecretManager);
subClusters = new ArrayList<>();
try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
stateStoreUtil.registerSubCluster(sc);
subClusters.add(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
fail();
}
DefaultMetricsSystem.setMiniClusterMode(true);
}
@AfterEach
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request interceptor pipeline for testing. The last one in the
// chain is the federation interceptor that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + TestableFederationClientInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
conf.setInt("yarn.scheduler.maximum-allocation-mb", 100 * 1024);
conf.setInt("yarn.scheduler.maximum-allocation-vcores", 100);
conf.setBoolean("hadoop.security.authentication", true);
return conf;
}
/**
* This test validates the correctness of GetNewApplication. The return
* ApplicationId has to belong to one of the SubCluster in the cluster.
*/
@Test
public void testGetNewApplication() throws YarnException, IOException {
LOG.info("Test FederationClientInterceptor: Get New Application.");
GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
GetNewApplicationResponse response = interceptor.getNewApplication(request);
assertNotNull(response);
assertNotNull(response.getApplicationId());
assertEquals(response.getApplicationId().getClusterTimestamp(),
ResourceManager.getClusterTimeStamp());
}
/**
* This test validates the correctness of SubmitApplication. The application
* has to be submitted to one of the SubCluster in the cluster.
*/
@Test
public void testSubmitApplication()
throws YarnException, IOException {
LOG.info("Test FederationClientInterceptor: Submit Application.");
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
assertNotNull(scIdResult);
assertTrue(subClusters.contains(scIdResult));
}
private SubmitApplicationRequest mockSubmitApplicationRequest(
ApplicationId appId) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
appId, MockApps.newAppName(), "default",
Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
"MockApp");
SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context);
return request;
}
/**
* This test validates the correctness of SubmitApplication in case of
* multiple submission. The first retry has to be submitted to the same
* SubCluster of the first attempt.
*/
@Test
public void testSubmitApplicationMultipleSubmission()
throws YarnException, IOException, InterruptedException {
LOG.info(
"Test FederationClientInterceptor: Submit Application - Multiple");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// First attempt
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
assertNotNull(scIdResult);
// First retry
response = interceptor.submitApplication(request);
assertNotNull(response);
SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
assertNotNull(scIdResult2);
assertEquals(scIdResult, scIdResult);
}
/**
* This test validates the correctness of SubmitApplication in case of empty
* request.
*/
@Test
public void testSubmitApplicationEmptyRequest()
throws Exception {
LOG.info("Test FederationClientInterceptor: Submit Application - Empty.");
// null request1
LambdaTestUtils.intercept(YarnException.class,
"Missing submitApplication request or applicationSubmissionContext information.",
() -> interceptor.submitApplication(null));
// null request2
LambdaTestUtils.intercept(YarnException.class,
"Missing submitApplication request or applicationSubmissionContext information.",
() -> interceptor.submitApplication(SubmitApplicationRequest.newInstance(null)));
// null request3
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(null, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
LambdaTestUtils.intercept(YarnException.class,
"Missing submitApplication request or applicationSubmissionContext information.",
() -> interceptor.submitApplication(request));
}
/**
* This test validates the correctness of ForceKillApplication in case the
* application exists in the cluster.
*/
@Test
public void testForceKillApplication()
throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: Force Kill Application.");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application we are going to kill later
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId);
KillApplicationResponse responseKill = interceptor.forceKillApplication(requestKill);
assertNotNull(responseKill);
}
@Test
@Disabled
public void testForceKillApplicationAllSubClusters()
throws IOException, YarnException, InterruptedException, TimeoutException {
// TODO: testForceKillApplicationAllSubClusters sometimes fails to run, temporarily disable
// We will design a unit test. In this unit test,
// we will submit the same application to all sub-clusters.
// Then we use interceptor kill application,
// the application should be cleared from all sub-clusters.
Set<SubClusterId> subClusterSet = new HashSet<>();
for (SubClusterId subCluster : subClusters) {
subClusterSet.add(subCluster);
}
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application we are going to kill later
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
SubClusterId subClusterId = stateStoreUtil.queryApplicationHomeSC(appId);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
subClusterSet.remove(subClusterId);
for (SubClusterId subCluster : subClusterSet) {
LOG.info("SubCluster : {}.", subCluster);
ApplicationClientProtocol clientRMProxyForSubCluster =
interceptor.getClientRMProxyForSubCluster(subCluster);
clientRMProxyForSubCluster.submitApplication(request);
}
KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId);
GenericTestUtils.waitFor(() -> {
KillApplicationResponse responseKill;
try {
responseKill = interceptor.forceKillApplication(requestKill);
} catch (Exception e) {
throw new RuntimeException(e);
}
return (responseKill.getIsKillCompleted());
}, 100, 10000);
for (SubClusterId subCluster : subClusters) {
ApplicationClientProtocol clientRMProxyForSubCluster =
interceptor.getClientRMProxyForSubCluster(subCluster);
GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse responseGet =
clientRMProxyForSubCluster.getApplicationReport(requestGet);
assertNotNull(responseGet);
ApplicationReport applicationReport = responseGet.getApplicationReport();
assertNotNull(applicationReport);
YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
assertNotNull(yarnApplicationState);
assertEquals(YarnApplicationState.KILLED, yarnApplicationState);
}
}
/**
* This test validates the correctness of ForceKillApplication in case of
* application does not exist in StateStore.
*/
@Test
public void testForceKillApplicationNotExists() throws Exception {
LOG.info("Test FederationClientInterceptor: Force Kill Application - Not Exists");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
KillApplicationRequest requestKill =
KillApplicationRequest.newInstance(appId);
LambdaTestUtils.intercept(YarnException.class,
"Application " + appId + " does not exist in FederationStateStore.",
() -> interceptor.forceKillApplication(requestKill));
}
/**
* This test validates the correctness of ForceKillApplication in case of
* empty request.
*/
@Test
public void testForceKillApplicationEmptyRequest()
throws Exception {
LOG.info("Test FederationClientInterceptor: Force Kill Application - Empty.");
// null request1
LambdaTestUtils.intercept(YarnException.class,
"Missing forceKillApplication request or ApplicationId.",
() -> interceptor.forceKillApplication(null));
// null request2
KillApplicationRequest killRequest = KillApplicationRequest.newInstance(null);
LambdaTestUtils.intercept(YarnException.class,
"Missing forceKillApplication request or ApplicationId.",
() -> interceptor.forceKillApplication(killRequest));
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationReport()
throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: Get Application Report");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application we want the report later
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
GetApplicationReportRequest requestGet =
GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse responseGet =
interceptor.getApplicationReport(requestGet);
assertNotNull(responseGet);
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationNotExists()
throws Exception {
LOG.info("Test ApplicationClientProtocol: Get Application Report - Not Exists.");
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId);
LambdaTestUtils.intercept(YarnException.class,
"Application " + appId + " does not exist in FederationStateStore.",
() -> interceptor.getApplicationReport(requestGet));
}
/**
* This test validates the correctness of GetApplicationReport in case of
* empty request.
*/
@Test
public void testGetApplicationEmptyRequest()
throws Exception {
LOG.info("Test FederationClientInterceptor: Get Application Report - Empty.");
// null request1
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationReport request or applicationId information.",
() -> interceptor.getApplicationReport(null));
// null request2
GetApplicationReportRequest reportRequest = GetApplicationReportRequest.newInstance(null);
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationReport request or applicationId information.",
() -> interceptor.getApplicationReport(reportRequest));
}
/**
* This test validates the correctness of GetApplicationAttemptReport in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationAttemptReport()
throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report.");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application we want the applicationAttempt report later
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts Get ApplicationAttemptId
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
// Wait for app to start
while(attemptsResponse.getApplicationAttemptList().size() == 0) {
attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
}
assertNotNull(attemptsResponse);
GetApplicationAttemptReportRequest requestGet =
GetApplicationAttemptReportRequest.newInstance(
attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId());
GetApplicationAttemptReportResponse responseGet =
interceptor.getApplicationAttemptReport(requestGet);
assertNotNull(responseGet);
}
/**
* This test validates the correctness of GetApplicationAttemptReport in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationAttemptNotExists() throws Exception {
LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Not Exists.");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId, 1);
GetApplicationAttemptReportRequest requestGet =
GetApplicationAttemptReportRequest.newInstance(appAttemptID);
LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " +
appAttemptID + " belongs to Application " +
appId + " does not exist in FederationStateStore.",
() -> interceptor.getApplicationAttemptReport(requestGet));
}
/**
* This test validates the correctness of GetApplicationAttemptReport in case of
* empty request.
*/
@Test
public void testGetApplicationAttemptEmptyRequest()
throws Exception {
LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Empty.");
// null request1
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport request or applicationId " +
"or applicationAttemptId information.",
() -> interceptor.getApplicationAttemptReport(null));
// null request2
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport request or applicationId " +
"or applicationAttemptId information.",
() -> interceptor.getApplicationAttemptReport(
GetApplicationAttemptReportRequest.newInstance(null)));
// null request3
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport request or applicationId " +
"or applicationAttemptId information.",
() -> interceptor.getApplicationAttemptReport(
GetApplicationAttemptReportRequest.newInstance(
ApplicationAttemptId.newInstance(null, 1))));
}
@Test
public void testGetClusterMetricsRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterMetrics request.",
() -> interceptor.getClusterMetrics(null));
// normal request.
GetClusterMetricsResponse response =
interceptor.getClusterMetrics(GetClusterMetricsRequest.newInstance());
assertEquals(subClusters.size(),
response.getClusterMetrics().getNumNodeManagers());
// Clear Membership
Map<SubClusterId, SubClusterInfo> membership = new HashMap<>();
membership.putAll(stateStore.getMembership());
stateStore.getMembership().clear();
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class},
new Object[] {GetClusterMetricsRequest.newInstance()});
Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.invokeConcurrent(
remoteMethod, GetClusterMetricsResponse.class);
assertTrue(clusterMetrics.isEmpty());
// Restore membership
stateStore.setMembership(membership);
}
/**
* This test validates the correctness of GetApplicationsResponse in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationsResponse()
throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: Get Applications Response.");
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
Set<String> appTypes = Collections.singleton("MockApp");
GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
assertNotNull(responseGet);
}
/**
* This test validates the correctness of GetApplicationsResponse in case of
* empty request.
*/
@Test
public void testGetApplicationsNullRequest() throws Exception {
LOG.info("Test FederationClientInterceptor: Get Applications request.");
LambdaTestUtils.intercept(YarnException.class, "Missing getApplications request.",
() -> interceptor.getApplications(null));
}
/**
* This test validates the correctness of GetApplicationsResponse in case applications
* with given type does not exist.
*/
@Test
public void testGetApplicationsApplicationTypeNotExists() throws Exception{
LOG.info("Test FederationClientInterceptor: Application with type does not exist.");
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
Set<String> appTypes = Collections.singleton("SPARK");
GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
assertNotNull(responseGet);
assertTrue(responseGet.getApplicationList().isEmpty());
}
/**
* This test validates the correctness of GetApplicationsResponse in case applications
* with given YarnApplicationState does not exist.
*/
@Test
public void testGetApplicationsApplicationStateNotExists() throws Exception {
LOG.info("Test FederationClientInterceptor: Application with state does not exist.");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
EnumSet<YarnApplicationState> applicationStates = EnumSet.noneOf(
YarnApplicationState.class);
applicationStates.add(YarnApplicationState.KILLED);
GetApplicationsRequest requestGet =
GetApplicationsRequest.newInstance(applicationStates);
GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
assertNotNull(responseGet);
assertTrue(responseGet.getApplicationList().isEmpty());
}
@Test
public void testGetClusterNodesRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster Nodes request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.",
() -> interceptor.getClusterNodes(null));
// normal request.
GetClusterNodesResponse response =
interceptor.getClusterNodes(GetClusterNodesRequest.newInstance());
assertEquals(subClusters.size(), response.getNodeReports().size());
}
@Test
public void testGetNodeToLabelsRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Node To Labels request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToLabels request.",
() -> interceptor.getNodeToLabels(null));
// normal request.
GetNodesToLabelsResponse response =
interceptor.getNodeToLabels(GetNodesToLabelsRequest.newInstance());
assertEquals(0, response.getNodeToLabels().size());
}
@Test
public void testGetLabelsToNodesRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Labels To Node request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getLabelsToNodes request.",
() -> interceptor.getLabelsToNodes(null));
// normal request.
GetLabelsToNodesResponse response =
interceptor.getLabelsToNodes(GetLabelsToNodesRequest.newInstance());
assertEquals(0, response.getLabelsToNodes().size());
}
@Test
public void testClusterNodeLabelsRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeLabels request.",
() -> interceptor.getClusterNodeLabels(null));
// normal request.
GetClusterNodeLabelsResponse response =
interceptor.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
assertEquals(0, response.getNodeLabelList().size());
}
@Test
public void testGetQueueUserAcls() throws Exception {
LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.",
() -> interceptor.getQueueUserAcls(null));
// normal request
GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls(
GetQueueUserAclsInfoRequest.newInstance());
assertNotNull(response);
List<QueueACL> submitAndAdministerAcl = new ArrayList<>();
submitAndAdministerAcl.add(QueueACL.SUBMIT_APPLICATIONS);
submitAndAdministerAcl.add(QueueACL.ADMINISTER_QUEUE);
QueueUserACLInfo exceptRootQueueACLInfo = QueueUserACLInfo.newInstance("root",
submitAndAdministerAcl);
QueueUserACLInfo queueRootQueueACLInfo = response.getUserAclsInfoList().stream().
filter(acl->acl.getQueueName().equals("root")).
collect(Collectors.toList()).get(0);
assertEquals(exceptRootQueueACLInfo, queueRootQueueACLInfo);
}
@Test
public void testListReservations() throws Exception {
LOG.info("Test FederationClientInterceptor : Get ListReservations request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.",
() -> interceptor.listReservations(null));
// normal request
ReservationId reservationId = ReservationId.newInstance(1653487680L, 1L);
ReservationListResponse response = interceptor.listReservations(
ReservationListRequest.newInstance("root.decided", reservationId.toString()));
assertNotNull(response);
assertEquals(0, response.getReservationAllocationState().size());
}
@Test
public void testGetContainersRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Containers request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " +
"or ApplicationAttemptId.", () -> interceptor.getContainers(null));
// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
// Wait for app to start
while(attemptsResponse.getApplicationAttemptList().size() == 0) {
attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
}
assertNotNull(attemptsResponse);
// Call GetContainers
GetContainersRequest containersRequest =
GetContainersRequest.newInstance(
attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId());
GetContainersResponse containersResponse =
interceptor.getContainers(containersRequest);
assertNotNull(containersResponse);
}
@Test
public void testGetContainerReportRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Container Report request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getContainerReport request " +
"or containerId", () -> interceptor.getContainerReport(null));
// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
// Wait for app to start
while(attemptsResponse.getApplicationAttemptList().size() == 0) {
attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
}
assertNotNull(attemptsResponse);
ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList().
get(0).getApplicationAttemptId();
ContainerId containerId = ContainerId.newContainerId(attemptId, 1);
// Call ContainerReport, RM does not allocate Container, Here is null
GetContainerReportRequest containerReportRequest =
GetContainerReportRequest.newInstance(containerId);
GetContainerReportResponse containerReportResponse =
interceptor.getContainerReport(containerReportRequest);
assertEquals(containerReportResponse, null);
}
@Test
public void getApplicationAttempts() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Application Attempts request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getApplicationAttempts " +
"request or application id.", () -> interceptor.getApplicationAttempts(null));
// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
assertNotNull(attemptsResponse);
}
@Test
public void testGetResourceTypeInfoRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.",
() -> interceptor.getResourceTypeInfo(null));
// normal request.
GetAllResourceTypeInfoResponse response =
interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance());
assertEquals(2, response.getResourceTypeInfo().size());
}
@Test
public void testFailApplicationAttempt() throws Exception {
LOG.info("Test FederationClientInterceptor : Fail Application Attempt request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " +
"or applicationId or applicationAttemptId information.",
() -> interceptor.failApplicationAttempt(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
assertNotNull(attemptsResponse);
ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList().
get(0).getApplicationAttemptId();
FailApplicationAttemptRequest requestFailAppAttempt =
FailApplicationAttemptRequest.newInstance(attemptId);
FailApplicationAttemptResponse responseFailAppAttempt =
interceptor.failApplicationAttempt(requestFailAppAttempt);
assertNotNull(responseFailAppAttempt);
}
@Test
public void testUpdateApplicationPriority() throws Exception {
LOG.info("Test FederationClientInterceptor : Update Application Priority request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " +
"or applicationId or applicationPriority information.",
() -> interceptor.updateApplicationPriority(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
assertNotNull(attemptsResponse);
Priority priority = Priority.newInstance(20);
UpdateApplicationPriorityRequest requestUpdateAppPriority =
UpdateApplicationPriorityRequest.newInstance(appId, priority);
UpdateApplicationPriorityResponse responseAppPriority =
interceptor.updateApplicationPriority(requestUpdateAppPriority);
assertNotNull(responseAppPriority);
assertEquals(20,
responseAppPriority.getApplicationPriority().getPriority());
}
@Test
public void testUpdateApplicationTimeouts() throws Exception {
LOG.info("Test FederationClientInterceptor : Update Application Timeouts request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationTimeouts request " +
"or applicationId or applicationTimeouts information.",
() -> interceptor.updateApplicationTimeouts(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
assertNotNull(attemptsResponse);
String appTimeout =
Times.formatISO8601(System.currentTimeMillis() + 5 * 1000);
Map<ApplicationTimeoutType, String> applicationTimeouts = new HashMap<>();
applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, appTimeout);
UpdateApplicationTimeoutsRequest timeoutsRequest =
UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts);
UpdateApplicationTimeoutsResponse timeoutsResponse =
interceptor.updateApplicationTimeouts(timeoutsRequest);
String responseTimeOut =
timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
assertNotNull(timeoutsResponse);
assertEquals(appTimeout, responseTimeOut);
}
@Test
public void testSignalContainer() throws Exception {
LOG.info("Test FederationClientInterceptor : Signal Container request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing signalToContainer request " +
"or containerId or command information.", () -> interceptor.signalToContainer(null));
// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true);
MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId();
SignalContainerRequest signalContainerRequest =
SignalContainerRequest.newInstance(containerId, SignalContainerCommand.GRACEFUL_SHUTDOWN);
SignalContainerResponse signalContainerResponse =
interceptor.signalToContainer(signalContainerRequest);
assertNotNull(signalContainerResponse);
}
@Test
public void testMoveApplicationAcrossQueues() throws Exception {
LOG.info("Test FederationClientInterceptor : MoveApplication AcrossQueues request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing moveApplicationAcrossQueues request " +
"or applicationId or target queue.", () -> interceptor.moveApplicationAcrossQueues(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true);
MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
MoveApplicationAcrossQueuesRequest acrossQueuesRequest =
MoveApplicationAcrossQueuesRequest.newInstance(appId, "root.target");
MoveApplicationAcrossQueuesResponse acrossQueuesResponse =
interceptor.moveApplicationAcrossQueues(acrossQueuesRequest);
assertNotNull(acrossQueuesResponse);
}
@Test
public void testGetQueueInfo() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Queue Info request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getQueueInfo request or queueName.",
() -> interceptor.getQueueInfo(null));
// normal request
GetQueueInfoResponse response = interceptor.getQueueInfo(
GetQueueInfoRequest.newInstance("root", true, true, true));
assertNotNull(response);
QueueInfo queueInfo = response.getQueueInfo();
assertNotNull(queueInfo);
assertEquals("root", queueInfo.getQueueName());
assertEquals(4.0, queueInfo.getCapacity(), 0);
assertEquals(0.0, queueInfo.getCurrentCapacity(), 0);
assertEquals(12, queueInfo.getChildQueues().size(), 0);
assertEquals(1, queueInfo.getAccessibleNodeLabels().size());
}
@Test
public void testSubClusterGetQueueInfo() throws IOException, YarnException {
// We have set up a unit test where we access queue information for subcluster1.
GetQueueInfoResponse response = interceptor.getQueueInfo(
GetQueueInfoRequest.newInstance("root", true, true, true, "1"));
assertNotNull(response);
QueueInfo queueInfo = response.getQueueInfo();
assertNotNull(queueInfo);
assertEquals("root", queueInfo.getQueueName());
assertEquals(1.0, queueInfo.getCapacity(), 0);
assertEquals(0.0, queueInfo.getCurrentCapacity(), 0);
assertEquals(3, queueInfo.getChildQueues().size(), 0);
assertEquals(1, queueInfo.getAccessibleNodeLabels().size());
}
@Test
public void testGetResourceProfiles() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Resource Profiles request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getResourceProfiles request.",
() -> interceptor.getResourceProfiles(null));
// normal request
GetAllResourceProfilesRequest request = GetAllResourceProfilesRequest.newInstance();
GetAllResourceProfilesResponse response = interceptor.getResourceProfiles(request);
assertNotNull(response);
Map<String, Resource> resProfiles = response.getResourceProfiles();
Resource maxResProfiles = resProfiles.get("maximum");
assertEquals(32768, maxResProfiles.getMemorySize());
assertEquals(16, maxResProfiles.getVirtualCores());
Resource defaultResProfiles = resProfiles.get("default");
assertEquals(8192, defaultResProfiles.getMemorySize());
assertEquals(8, defaultResProfiles.getVirtualCores());
Resource minimumResProfiles = resProfiles.get("minimum");
assertEquals(4096, minimumResProfiles.getMemorySize());
assertEquals(4, minimumResProfiles.getVirtualCores());
}
@Test
public void testGetResourceProfile() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Resource Profile request.");
// null request
LambdaTestUtils.intercept(YarnException.class,
"Missing getResourceProfile request or profileName.",
() -> interceptor.getResourceProfile(null));
// normal request
GetResourceProfileRequest request = GetResourceProfileRequest.newInstance("maximum");
GetResourceProfileResponse response = interceptor.getResourceProfile(request);
assertNotNull(response);
assertEquals(32768, response.getResource().getMemorySize());
assertEquals(16, response.getResource().getVirtualCores());
GetResourceProfileRequest request2 = GetResourceProfileRequest.newInstance("default");
GetResourceProfileResponse response2 = interceptor.getResourceProfile(request2);
assertNotNull(response2);
assertEquals(8192, response2.getResource().getMemorySize());
assertEquals(8, response2.getResource().getVirtualCores());
GetResourceProfileRequest request3 = GetResourceProfileRequest.newInstance("minimum");
GetResourceProfileResponse response3 = interceptor.getResourceProfile(request3);
assertNotNull(response3);
assertEquals(4096, response3.getResource().getMemorySize());
assertEquals(4, response3.getResource().getVirtualCores());
}
@Test
public void testGetAttributesToNodes() throws Exception {
LOG.info("Test FederationClientInterceptor : Get AttributesToNodes request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getAttributesToNodes request " +
"or nodeAttributes.", () -> interceptor.getAttributesToNodes(null));
// normal request
GetAttributesToNodesResponse response =
interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance());
assertNotNull(response);
Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
assertNotNull(attrs);
assertTrue(attrs.size() == 4 || attrs.size() == 5);
NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvidia");
NodeToAttributeValue attributeValue1 =
NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue());
NodeAttributeKey gpuKey = gpu.getAttributeKey();
assertTrue(attrs.get(gpuKey).contains(attributeValue1));
}
@Test
public void testClusterNodeAttributes() throws Exception {
LOG.info("Test FederationClientInterceptor : Get ClusterNodeAttributes request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeAttributes request.",
() -> interceptor.getClusterNodeAttributes(null));
// normal request
GetClusterNodeAttributesResponse response =
interceptor.getClusterNodeAttributes(GetClusterNodeAttributesRequest.newInstance());
assertNotNull(response);
Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
assertNotNull(nodeAttributeInfos);
assertTrue(nodeAttributeInfos.size() == 4 || nodeAttributeInfos.size() == 5);
NodeAttributeInfo nodeAttributeInfo1 =
NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
NodeAttributeType.STRING);
assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
NodeAttributeInfo nodeAttributeInfo2 =
NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("OS"),
NodeAttributeType.STRING);
assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
}
@Test
public void testNodesToAttributes() throws Exception {
LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request.");
// null request
LambdaTestUtils.intercept(YarnException.class,
"Missing getNodesToAttributes request or hostNames.",
() -> interceptor.getNodesToAttributes(null));
// normal request
Set<String> hostNames = Collections.singleton("0-host1");
GetNodesToAttributesResponse response =
interceptor.getNodesToAttributes(GetNodesToAttributesRequest.newInstance(hostNames));
assertNotNull(response);
Map<String, Set<NodeAttribute>> nodeAttributeMap = response.getNodeToAttributes();
assertNotNull(nodeAttributeMap);
assertEquals(1, nodeAttributeMap.size());
NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvida");
assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
}
@Test
public void testGetNewReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
// null request
LambdaTestUtils.intercept(YarnException.class,
"Missing getNewReservation request.", () -> interceptor.getNewReservation(null));
// normal request
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
assertNotNull(response);
ReservationId reservationId = response.getReservationId();
assertNotNull(reservationId);
assertTrue(reservationId.toString().contains("reservation"));
assertEquals(reservationId.getClusterTimestamp(), ResourceManager.getClusterTimeStamp());
}
@Test
public void testSubmitReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
assertNotNull(response);
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
assertNotNull(submissionResponse);
SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId);
assertNotNull(subClusterId);
assertTrue(subClusters.contains(subClusterId));
}
@Test
public void testSubmitReservationEmptyRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : SubmitReservation request empty.");
String errorMsg =
"Missing submitReservation request or reservationId or reservation definition or queue.";
// null request1
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(null));
// null request2
ReservationSubmissionRequest request2 =
ReservationSubmissionRequest.newInstance(null, null, null);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request2));
// null request3
ReservationSubmissionRequest request3 =
ReservationSubmissionRequest.newInstance(null, "q1", null);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request3));
// null request4
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
ReservationSubmissionRequest request4 =
ReservationSubmissionRequest.newInstance(null, null, reservationId);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request4));
// null request5
long arrival = Time.now();
long deadline = arrival + (int)(DEFAULT_DURATION * 1.1);
ReservationRequest rRequest = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 1, 1, DEFAULT_DURATION);
ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests,
ReservationRequestInterpreter.R_ALL, "u1");
ReservationSubmissionRequest request5 =
ReservationSubmissionRequest.newInstance(rDefinition, null, reservationId);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request5));
}
@Test
public void testSubmitReservationMultipleSubmission() throws Exception {
LOG.info("Test FederationClientInterceptor: Submit Reservation - Multiple");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
assertNotNull(response);
// First Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
assertNotNull(submissionResponse);
SubClusterId subClusterId1 = stateStoreUtil.queryReservationHomeSC(reservationId);
assertNotNull(subClusterId1);
assertTrue(subClusters.contains(subClusterId1));
// First Retry, repeat the submission
ReservationSubmissionResponse submissionResponse1 =
interceptor.submitReservation(rSubmissionRequest);
assertNotNull(submissionResponse1);
// Expect reserved clusters to be consistent
SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId);
assertNotNull(subClusterId2);
assertEquals(subClusterId1, subClusterId2);
}
@Test
public void testUpdateReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
assertNotNull(submissionResponse);
// Update Reservation
ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
ReservationUpdateResponse updateResponse =
interceptor.updateReservation(updateRequest);
assertNotNull(updateResponse);
SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId);
assertNotNull(subClusterId);
}
@Test
public void testDeleteReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
assertNotNull(submissionResponse);
// Delete Reservation
ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance(reservationId);
ReservationDeleteResponse deleteResponse = interceptor.deleteReservation(deleteRequest);
assertNotNull(deleteResponse);
LambdaTestUtils.intercept(YarnException.class,
"Reservation " + reservationId + " does not exist",
() -> stateStoreUtil.queryReservationHomeSC(reservationId));
}
private ReservationDefinition createReservationDefinition(int memory, int core) {
// get reservationId
long arrival = Time.now();
long deadline = arrival + (int)(DEFAULT_DURATION * 1.1);
ReservationRequest rRequest = ReservationRequest.newInstance(
Resource.newInstance(memory, core), 1, 1, DEFAULT_DURATION);
ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests,
ReservationRequestInterpreter.R_ALL, "u1");
return rDefinition;
}
/**
* This method is used to create a ReservationDefinition.
*
* @param arrival Job arrival time
* @param deadline Job deadline
* @param reservationRequests reservationRequest Array
* @param rType Enumeration of various types of
* dependencies among multiple ReservationRequest
* @param username username
* @return ReservationDefinition
*/
private ReservationDefinition createReservationDefinition(long arrival,
long deadline, ReservationRequest[] reservationRequests,
ReservationRequestInterpreter rType, String username) {
ReservationRequests requests = ReservationRequests
.newInstance(Arrays.asList(reservationRequests), rType);
return ReservationDefinition.newInstance(arrival, deadline,
requests, username, "0", Priority.UNDEFINED);
}
@Test
public void testGetNumMinThreads() {
// If we don't configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE,
// we expect to get 5 threads
int minThreads = interceptor.getNumMinThreads(this.getConf());
assertEquals(5, minThreads);
// If we configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE,
// we expect to get 3 threads
this.getConf().unset(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE);
this.getConf().setInt(YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE, 3);
int minThreads2 = interceptor.getNumMinThreads(this.getConf());
assertEquals(3, minThreads2);
}
@Test
public void testGetNumMaxThreads() {
// If we don't configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE,
// we expect to get 5 threads
int minThreads = interceptor.getNumMaxThreads(this.getConf());
assertEquals(5, minThreads);
// If we configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE,
// we expect to get 8 threads
this.getConf().unset(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE);
this.getConf().setInt(YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE, 8);
int minThreads2 = interceptor.getNumMaxThreads(this.getConf());
assertEquals(8, minThreads2);
}
@Test
public void testGetDelegationToken() throws IOException, YarnException {
// We design such a unit test to check
// that the execution of the GetDelegationToken method is as expected.
//
// 1. Apply for a DelegationToken for renewer1,
// the Router returns the DelegationToken of the user, and the KIND of the token is
// RM_DELEGATION_TOKEN
//
// 2. We maintain the compatibility with RMDelegationTokenIdentifier,
// we can serialize the token into RMDelegationTokenIdentifier.
//
// 3. We can get the issueDate, and compare the data in the StateStore,
// the data should be consistent.
// Step1. We apply for DelegationToken for renewer1
// Both response & delegationToken cannot be empty
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer1");
GetDelegationTokenResponse response = interceptor.getDelegationToken(request);
assertNotNull(response);
Token delegationToken = response.getRMDelegationToken();
assertNotNull(delegationToken);
assertEquals("RM_DELEGATION_TOKEN", delegationToken.getKind());
// Step2. Serialize the returned Token as RMDelegationTokenIdentifier.
org.apache.hadoop.security.token.Token<RMDelegationTokenIdentifier> token =
ConverterUtils.convertFromYarn(delegationToken, (Text) null);
RMDelegationTokenIdentifier rMDelegationTokenIdentifier = token.decodeIdentifier();
assertNotNull(rMDelegationTokenIdentifier);
// Step3. Verify the returned data of the token.
String renewer = rMDelegationTokenIdentifier.getRenewer().toString();
long issueDate = rMDelegationTokenIdentifier.getIssueDate();
long maxDate = rMDelegationTokenIdentifier.getMaxDate();
assertEquals("renewer1", renewer);
long tokenMaxLifetime = this.getConf().getLong(
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
assertEquals(issueDate + tokenMaxLifetime, maxDate);
RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState();
assertNotNull(managerState);
Map<RMDelegationTokenIdentifier, RouterStoreToken> delegationTokenState =
managerState.getTokenState();
assertNotNull(delegationTokenState);
assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier));
long tokenRenewInterval = this.getConf().getLong(
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
RouterStoreToken resultRouterStoreToken = delegationTokenState.get(rMDelegationTokenIdentifier);
assertNotNull(resultRouterStoreToken);
long renewDate = resultRouterStoreToken.getRenewDate();
assertEquals(issueDate + tokenRenewInterval, renewDate);
}
@Test
public void testRenewDelegationToken() throws IOException, YarnException {
// We design such a unit test to check
// that the execution of the GetDelegationToken method is as expected
// 1. Call GetDelegationToken to apply for delegationToken.
// 2. Call renewDelegationToken to refresh delegationToken.
// By looking at the code of AbstractDelegationTokenSecretManager#renewToken,
// we know that renewTime is calculated as Math.min(id.getMaxDate(), now + tokenRenewInterval)
// so renewTime will be less than or equal to maxDate.
// 3. We will compare whether the expirationTime returned to the
// client is consistent with the renewDate in the stateStore.
// Step1. Call GetDelegationToken to apply for delegationToken.
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer2");
GetDelegationTokenResponse response = interceptor.getDelegationToken(request);
assertNotNull(response);
Token delegationToken = response.getRMDelegationToken();
org.apache.hadoop.security.token.Token<RMDelegationTokenIdentifier> token =
ConverterUtils.convertFromYarn(delegationToken, (Text) null);
RMDelegationTokenIdentifier rMDelegationTokenIdentifier = token.decodeIdentifier();
String renewer = rMDelegationTokenIdentifier.getRenewer().toString();
long maxDate = rMDelegationTokenIdentifier.getMaxDate();
assertEquals("renewer2", renewer);
// Step2. Call renewDelegationToken to refresh delegationToken.
RenewDelegationTokenRequest renewRequest = Records.newRecord(RenewDelegationTokenRequest.class);
renewRequest.setDelegationToken(delegationToken);
RenewDelegationTokenResponse renewResponse = interceptor.renewDelegationToken(renewRequest);
assertNotNull(renewResponse);
long expDate = renewResponse.getNextExpirationTime();
assertTrue(expDate <= maxDate);
// Step3. Compare whether the expirationTime returned to
// the client is consistent with the renewDate in the stateStore
RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState();
Map<RMDelegationTokenIdentifier, RouterStoreToken> delegationTokenState =
managerState.getTokenState();
assertNotNull(delegationTokenState);
assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier));
RouterStoreToken resultRouterStoreToken = delegationTokenState.get(rMDelegationTokenIdentifier);
assertNotNull(resultRouterStoreToken);
long renewDate = resultRouterStoreToken.getRenewDate();
assertEquals(expDate, renewDate);
}
@Test
public void testCancelDelegationToken() throws IOException, YarnException {
// We design such a unit test to check
// that the execution of the CancelDelegationToken method is as expected
// 1. Call GetDelegationToken to apply for delegationToken.
// 2. Call CancelDelegationToken to cancel delegationToken.
// 3. Query the data in the StateStore and confirm that the Delegation has been deleted.
// Step1. Call GetDelegationToken to apply for delegationToken.
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer3");
GetDelegationTokenResponse response = interceptor.getDelegationToken(request);
assertNotNull(response);
Token delegationToken = response.getRMDelegationToken();
// Step2. Call CancelDelegationToken to cancel delegationToken.
CancelDelegationTokenRequest cancelTokenRequest =
CancelDelegationTokenRequest.newInstance(delegationToken);
CancelDelegationTokenResponse cancelTokenResponse =
interceptor.cancelDelegationToken(cancelTokenRequest);
assertNotNull(cancelTokenResponse);
// Step3. Query the data in the StateStore and confirm that the Delegation has been deleted.
// At this point, the size of delegationTokenState should be 0.
RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState();
Map<RMDelegationTokenIdentifier, RouterStoreToken> delegationTokenState =
managerState.getTokenState();
assertNotNull(delegationTokenState);
assertEquals(0, delegationTokenState.size());
}
}