TestFederationInterceptorREST.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.security.Principal;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntry;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationConfInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.dao.ConfInfo;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY;
import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.DURATION;
import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.NUM_CONTAINERS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
* {@code FederationInterceptor} class. The tests for
* {@code RouterClientRMService} has been written cleverly so that it can be
* reused to validate different request interceptor chains.
*/
public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
private final static int NUM_SUBCLUSTER = 4;
private static final int BAD_REQUEST = 400;
private static final int ACCEPTED = 202;
private static final String TEST_USER = "test-user";
private static final int OK = 200;
private static String user = "test-user";
private TestableFederationInterceptorREST interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private List<SubClusterId> subClusters;
private static final String TEST_RENEWER = "test-renewer";
@BeforeEach
public void setUp() throws YarnException, IOException {
super.setUpConfig();
interceptor = new TestableFederationInterceptorREST();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance(this.getConf()).reinitialize(stateStore,
this.getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(TEST_USER);
subClusters = new ArrayList<>();
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
stateStoreUtil.registerSubCluster(sc);
subClusters.add(sc);
}
RouterClientRMService routerClientRMService = new RouterClientRMService();
routerClientRMService.initUserPipelineMap(getConf());
long secretKeyInterval = this.getConf().getLong(
RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime = this.getConf().getLong(
RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval = this.getConf().getLong(
RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
long removeScanInterval = this.getConf().getTimeDuration(
RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY,
RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
RouterDelegationTokenSecretManager tokenSecretManager = new RouterDelegationTokenSecretManager(
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, removeScanInterval,
this.getConf());
tokenSecretManager.startThreads();
routerClientRMService.setRouterDTSecretManager(tokenSecretManager);
TestableFederationClientInterceptor clientInterceptor =
new TestableFederationClientInterceptor();
clientInterceptor.setConf(this.getConf());
clientInterceptor.init(TEST_RENEWER);
clientInterceptor.setTokenSecretManager(tokenSecretManager);
RequestInterceptorChainWrapper wrapper = new RequestInterceptorChainWrapper();
wrapper.init(clientInterceptor);
routerClientRMService.getUserPipelineMap().put(TEST_RENEWER, wrapper);
interceptor.setRouterClientRMService(routerClientRMService);
for (SubClusterId subCluster : subClusters) {
SubClusterInfo subClusterInfo = stateStoreUtil.querySubClusterInfo(subCluster);
interceptor.getOrCreateInterceptorForSubCluster(
subCluster, subClusterInfo.getRMWebServiceAddress());
}
interceptor.setupResourceManager();
}
@AfterEach
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
MockDefaultRequestInterceptorREST.class.getName());
String mockPassThroughInterceptorClass =
PassThroughRESTRequestInterceptor.class.getName();
// Create a request interceptor pipeline for testing. The last one in the
// chain is the federation interceptor that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + ","
+ TestableFederationInterceptorREST.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
// Open AppsInfo Cache
conf.setBoolean(YarnConfiguration.ROUTER_APPSINFO_ENABLED, true);
conf.setInt(YarnConfiguration.ROUTER_APPSINFO_CACHED_COUNT, 10);
return conf;
}
/**
* This test validates the correctness of GetNewApplication. The return
* ApplicationId has to belong to one of the SubCluster in the cluster.
*/
@Test
public void testGetNewApplication() throws IOException, InterruptedException {
Response response = interceptor.createNewApplication(null);
assertNotNull(response);
NewApplication ci = (NewApplication) response.getEntity();
assertNotNull(ci);
ApplicationId appId = ApplicationId.fromString(ci.getApplicationId());
assertTrue(appId.getClusterTimestamp() < NUM_SUBCLUSTER);
assertTrue(appId.getClusterTimestamp() >= 0);
}
/**
* This test validates the correctness of SubmitApplication. The application
* has to be submitted to one of the SubCluster in the cluster.
*/
@Test
public void testSubmitApplication()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
assertEquals(ACCEPTED, response.getStatus());
SubClusterId ci = (SubClusterId) response.getEntity();
assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
assertNotNull(scIdResult);
assertTrue(subClusters.contains(scIdResult));
assertEquals(ci, scIdResult);
}
/**
* This test validates the correctness of SubmitApplication in case of
* multiple submission. The first retry has to be submitted to the same
* SubCluster of the first attempt.
*/
@Test
public void testSubmitApplicationMultipleSubmission()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// First attempt
Response response = interceptor.submitApplication(context, null);
assertNotNull(response);
assertEquals(ACCEPTED, response.getStatus());
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
assertNotNull(scIdResult);
// First retry
response = interceptor.submitApplication(context, null);
assertNotNull(response);
assertEquals(ACCEPTED, response.getStatus());
SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
assertNotNull(scIdResult2);
assertEquals(scIdResult, scIdResult2);
}
/**
* This test validates the correctness of SubmitApplication in case of empty
* request.
*/
@Test
public void testSubmitApplicationEmptyRequest() throws IOException, InterruptedException {
// ApplicationSubmissionContextInfo null
Response response = interceptor.submitApplication(null, null);
assertEquals(BAD_REQUEST, response.getStatus());
// ApplicationSubmissionContextInfo empty
response = interceptor
.submitApplication(new ApplicationSubmissionContextInfo(), null);
assertEquals(BAD_REQUEST, response.getStatus());
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
response = interceptor.submitApplication(context, null);
assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of SubmitApplication in case of
* application in wrong format.
*/
@Test
public void testSubmitApplicationWrongFormat() throws IOException, InterruptedException {
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId("Application_wrong_id");
Response response = interceptor.submitApplication(context, null);
assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of ForceKillApplication in case the
* application exists in the cluster.
*/
@Test
public void testForceKillApplication()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we are going to kill later
Response response = interceptor.submitApplication(context, null);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
AppState appState = new AppState("KILLED");
Response responseKill =
interceptor.updateAppState(appState, null, appId.toString());
assertNotNull(responseKill);
}
/**
* This test validates the correctness of ForceKillApplication in case of
* application does not exist in StateStore.
*/
@Test
public void testForceKillApplicationNotExists()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
AppState appState = new AppState("KILLED");
Response response =
interceptor.updateAppState(appState, null, appId.toString());
assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of ForceKillApplication in case of
* application in wrong format.
*/
@Test
public void testForceKillApplicationWrongFormat()
throws YarnException, IOException, InterruptedException {
AppState appState = new AppState("KILLED");
Response response =
interceptor.updateAppState(appState, null, "Application_wrong_id");
assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of ForceKillApplication in case of
* empty request.
*/
@Test
public void testForceKillApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we are going to kill later
interceptor.submitApplication(context, null);
Response response =
interceptor.updateAppState(null, null, appId.toString());
assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationReport()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we want the report later
Response response = interceptor.submitApplication(context, null);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
AppInfo responseGet = interceptor.getApp(null, appId.toString(), null);
assertNotNull(responseGet);
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationNotExists() {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppInfo response = interceptor.getApp(null, appId.toString(), null);
assertNull(response);
}
/**
* This test validates the correctness of GetApplicationReport in case of
* application in wrong format.
*/
@Test
public void testGetApplicationWrongFormat() {
IllegalArgumentException illegalArgumentException =
assertThrows(IllegalArgumentException.class, () -> {
interceptor.getApp(null, "Application_wrong_id", null);
});
assertTrue(illegalArgumentException.getMessage().contains(
"Invalid ApplicationId prefix: Application_wrong_id."));
}
/**
* This test validates the correctness of GetApplicationsReport in case each
* subcluster provided one application.
*/
@Test
public void testGetApplicationsReport() {
AppsInfo responseGet = interceptor.getApps(null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null);
assertNotNull(responseGet);
assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
// The merged operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case each subcluster
* provided one node with the LastHealthUpdate set to the SubClusterId. The
* expected result would be the NodeInfo from the last SubCluster that has
* LastHealthUpdate equal to Num_SubCluster -1.
*/
@Test
public void testGetNode() {
NodeInfo responseGet = interceptor.getNode("testGetNode");
assertNotNull(responseGet);
assertEquals(NUM_SUBCLUSTER - 1, responseGet.getLastHealthUpdate());
}
/**
* This test validates the correctness of GetNodes in case each subcluster
* provided one node.
*/
@Test
public void testGetNodes() {
NodesInfo responseGet = interceptor.getNodes(null);
assertNotNull(responseGet);
assertEquals(NUM_SUBCLUSTER, responseGet.getNodes().size());
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of updateNodeResource().
*/
@Test
public void testUpdateNodeResource() {
List<NodeInfo> nodes = interceptor.getNodes(null).getNodes();
assertFalse(nodes.isEmpty());
final String nodeId = nodes.get(0).getNodeId();
ResourceOptionInfo resourceOption = new ResourceOptionInfo(
ResourceOption.newInstance(
Resource.newInstance(2048, 3), 1000));
ResourceInfo resource = interceptor.updateNodeResource(
null, nodeId, resourceOption);
assertNotNull(resource);
assertEquals(2048, resource.getMemorySize());
assertEquals(3, resource.getvCores());
}
/**
* This test validates the correctness of getClusterMetricsInfo in case each
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to the
* SubClusterId. The expected result would be appSubmitted equals to the sum
* of SubClusterId. SubClusterId in this case is an integer.
*/
@Test
public void testGetClusterMetrics() {
ClusterMetricsInfo responseGet = interceptor.getClusterMetricsInfo();
assertNotNull(responseGet);
int expectedAppSubmitted = 0;
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
expectedAppSubmitted += i;
}
assertEquals(expectedAppSubmitted, responseGet.getAppsSubmitted());
// The merge operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetApplicationState in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationState()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we want the report later
Response response = interceptor.submitApplication(context, null);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
AppState responseGet = interceptor.getAppState(null, appId.toString());
assertNotNull(responseGet);
assertEquals(MockDefaultRequestInterceptorREST.APP_STATE_RUNNING,
responseGet.getState());
}
/**
* This test validates the correctness of GetApplicationState in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationStateNotExists() throws IOException {
ApplicationId appId =
ApplicationId.newInstance(Time.now(), 1);
AppState response = interceptor.getAppState(null, appId.toString());
assertNull(response);
}
/**
* This test validates the correctness of GetApplicationState in case of
* application in wrong format.
*/
@Test
public void testGetApplicationStateWrongFormat()
throws IOException {
AppState response = interceptor.getAppState(null, "Application_wrong_id");
assertNull(response);
}
/**
* This test validates the creation of new interceptor in case of a
* RMSwitchover in a subCluster.
*/
@Test
public void testRMSwitchoverOfOneSC() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance(Integer.toString(0));
interceptor.getClusterMetricsInfo();
assertEquals("http://1.2.3.4:4", interceptor
.getInterceptorForSubCluster(subClusterId).getWebAppAddress());
//Register the first subCluster with secondRM simulating RMSwitchover
registerSubClusterWithSwitchoverRM(subClusterId);
interceptor.getClusterMetricsInfo();
assertEquals("http://5.6.7.8:8", interceptor
.getInterceptorForSubCluster(subClusterId).getWebAppAddress());
}
private void registerSubClusterWithSwitchoverRM(SubClusterId subClusterId)
throws YarnException {
String amRMAddress = "5.6.7.8:5";
String clientRMAddress = "5.6.7.8:6";
String rmAdminAddress = "5.6.7.8:7";
String webAppAddress = "5.6.7.8:8";
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
"capability");
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
@Test
public void testGetContainers()
throws YarnException, IOException, InterruptedException {
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we want the report later
Response response = interceptor.submitApplication(context, null);
assertNotNull(response);
assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
ApplicationAttemptId appAttempt = ApplicationAttemptId.newInstance(appId, 1);
ContainersInfo responseGet = interceptor.getContainers(
null, null, appId.toString(), appAttempt.toString());
assertEquals(4, responseGet.getContainers().size());
}
@Test
public void testGetContainersNotExists() throws Exception {
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the appAttemptId is empty or null.",
() -> interceptor.getContainers(null, null, appId.toString(), null));
}
@Test
public void testGetContainersWrongFormat() throws Exception {
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationAttemptId appAttempt = ApplicationAttemptId.newInstance(appId, 1);
// Test Case 1: appId is wrong format, appAttemptId is accurate.
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Invalid ApplicationId prefix: Application_wrong_id. " +
"The valid ApplicationId should start with prefix application",
() -> interceptor.getContainers(null, null, "Application_wrong_id", appAttempt.toString()));
// Test Case2: appId is accurate, appAttemptId is wrong format.
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Invalid AppAttemptId prefix: AppAttempt_wrong_id",
() -> interceptor.getContainers(null, null, appId.toString(), "AppAttempt_wrong_id"));
}
@Test
public void testGetNodeToLabels() throws IOException {
NodeToLabelsInfo info = interceptor.getNodeToLabels(null);
HashMap<String, NodeLabelsInfo> map = info.getNodeToLabels();
assertNotNull(map);
assertEquals(2, map.size());
NodeLabelsInfo node1Value = map.getOrDefault("node1", null);
assertNotNull(node1Value);
assertEquals(1, node1Value.getNodeLabelsName().size());
assertEquals("CPU", node1Value.getNodeLabelsName().get(0));
NodeLabelsInfo node2Value = map.getOrDefault("node2", null);
assertNotNull(node2Value);
assertEquals(1, node2Value.getNodeLabelsName().size());
assertEquals("GPU", node2Value.getNodeLabelsName().get(0));
}
@Test
public void testGetLabelsToNodes() throws Exception {
LabelsToNodesInfo labelsToNodesInfo = interceptor.getLabelsToNodes(null);
Map<NodeLabelInfo, NodeIDsInfo> map = labelsToNodesInfo.getLabelsToNodes();
assertNotNull(map);
assertEquals(3, map.size());
NodeLabel labelX = NodeLabel.newInstance("x", false);
NodeLabelInfo nodeLabelInfoX = new NodeLabelInfo(labelX);
NodeIDsInfo nodeIDsInfoX = map.get(nodeLabelInfoX);
assertNotNull(nodeIDsInfoX);
assertEquals(2, nodeIDsInfoX.getNodeIDs().size());
Resource resourceX =
nodeIDsInfoX.getPartitionInfo().getResourceAvailable().getResource();
assertNotNull(resourceX);
assertEquals(4*10, resourceX.getVirtualCores());
assertEquals(4*20*1024, resourceX.getMemorySize());
NodeLabel labelY = NodeLabel.newInstance("y", false);
NodeLabelInfo nodeLabelInfoY = new NodeLabelInfo(labelY);
NodeIDsInfo nodeIDsInfoY = map.get(nodeLabelInfoY);
assertNotNull(nodeIDsInfoY);
assertEquals(2, nodeIDsInfoY.getNodeIDs().size());
Resource resourceY =
nodeIDsInfoY.getPartitionInfo().getResourceAvailable().getResource();
assertNotNull(resourceY);
assertEquals(4*20, resourceY.getVirtualCores());
assertEquals(4*40*1024, resourceY.getMemorySize());
}
@Test
public void testGetClusterNodeLabels() throws Exception {
NodeLabelsInfo nodeLabelsInfo = interceptor.getClusterNodeLabels(null);
assertNotNull(nodeLabelsInfo);
assertEquals(2, nodeLabelsInfo.getNodeLabelsName().size());
List<String> nodeLabelsName = nodeLabelsInfo.getNodeLabelsName();
assertNotNull(nodeLabelsName);
assertTrue(nodeLabelsName.contains("cpu"));
assertTrue(nodeLabelsName.contains("gpu"));
ArrayList<NodeLabelInfo> nodeLabelInfos = nodeLabelsInfo.getNodeLabelsInfo();
assertNotNull(nodeLabelInfos);
assertEquals(2, nodeLabelInfos.size());
NodeLabelInfo cpuNodeLabelInfo = new NodeLabelInfo("cpu", false);
assertTrue(nodeLabelInfos.contains(cpuNodeLabelInfo));
NodeLabelInfo gpuNodeLabelInfo = new NodeLabelInfo("gpu", false);
assertTrue(nodeLabelInfos.contains(gpuNodeLabelInfo));
}
@Test
public void testGetLabelsOnNode() throws Exception {
NodeLabelsInfo nodeLabelsInfo = interceptor.getLabelsOnNode(null, "node1");
assertNotNull(nodeLabelsInfo);
assertEquals(2, nodeLabelsInfo.getNodeLabelsName().size());
List<String> nodeLabelsName = nodeLabelsInfo.getNodeLabelsName();
assertNotNull(nodeLabelsName);
assertTrue(nodeLabelsName.contains("x"));
assertTrue(nodeLabelsName.contains("y"));
// null request
interceptor.setAllowPartialResult(false);
NodeLabelsInfo nodeLabelsInfo2 = interceptor.getLabelsOnNode(null, "node2");
assertNotNull(nodeLabelsInfo2);
assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size());
}
@Test
public void testGetContainer() throws Exception {
//
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerId appContainerId = ContainerId.newContainerId(appAttemptId, 1);
String applicationId = appId.toString();
String attemptId = appAttemptId.toString();
String containerId = appContainerId.toString();
// Submit application to multiSubCluster
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(applicationId);
assertNotNull(interceptor.submitApplication(context, null));
// Test Case1: Wrong ContainerId
LambdaTestUtils.intercept(IllegalArgumentException.class, "Invalid ContainerId prefix: 0",
() -> interceptor.getContainer(null, null, applicationId, attemptId, "0"));
// Test Case2: Correct ContainerId
ContainerInfo containerInfo = interceptor.getContainer(null, null, applicationId,
attemptId, containerId);
assertNotNull(containerInfo);
}
@Test
public void testGetAppAttempts() throws IOException, InterruptedException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
assertNotNull(interceptor.submitApplication(context, null));
AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString());
assertNotNull(appAttemptsInfo);
ArrayList<AppAttemptInfo> attemptLists = appAttemptsInfo.getAttempts();
assertNotNull(appAttemptsInfo);
assertEquals(2, attemptLists.size());
AppAttemptInfo attemptInfo1 = attemptLists.get(0);
assertNotNull(attemptInfo1);
assertEquals(0, attemptInfo1.getAttemptId());
assertEquals("AppAttemptId_0", attemptInfo1.getAppAttemptId());
assertEquals("LogLink_0", attemptInfo1.getLogsLink());
assertEquals(1659621705L, attemptInfo1.getFinishedTime());
AppAttemptInfo attemptInfo2 = attemptLists.get(1);
assertNotNull(attemptInfo2);
assertEquals(0, attemptInfo2.getAttemptId());
assertEquals("AppAttemptId_1", attemptInfo2.getAppAttemptId());
assertEquals("LogLink_1", attemptInfo2.getLogsLink());
assertEquals(1659621705L, attemptInfo2.getFinishedTime());
}
@Test
public void testGetAppAttempt() throws IOException, InterruptedException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
assertNotNull(interceptor.submitApplication(context, null));
ApplicationAttemptId expectAppAttemptId = ApplicationAttemptId.newInstance(appId, 1);
String appAttemptId = expectAppAttemptId.toString();
org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo
appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), appAttemptId);
assertNotNull(appAttemptInfo);
assertEquals(expectAppAttemptId.toString(), appAttemptInfo.getAppAttemptId());
assertEquals("url", appAttemptInfo.getTrackingUrl());
assertEquals("oUrl", appAttemptInfo.getOriginalTrackingUrl());
assertEquals(124, appAttemptInfo.getRpcPort());
assertEquals("host", appAttemptInfo.getHost());
}
@Test
public void testGetAppTimeout() throws IOException, InterruptedException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
assertNotNull(interceptor.submitApplication(context, null));
ApplicationTimeoutType appTimeoutType = ApplicationTimeoutType.LIFETIME;
AppTimeoutInfo appTimeoutInfo =
interceptor.getAppTimeout(null, appId.toString(), appTimeoutType.toString());
assertNotNull(appTimeoutInfo);
assertEquals(10, appTimeoutInfo.getRemainingTimeInSec());
assertEquals("UNLIMITED", appTimeoutInfo.getExpireTime());
assertEquals(appTimeoutType, appTimeoutInfo.getTimeoutType());
}
@Test
public void testGetAppTimeouts() throws IOException, InterruptedException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
assertNotNull(interceptor.submitApplication(context, null));
AppTimeoutsInfo appTimeoutsInfo = interceptor.getAppTimeouts(null, appId.toString());
assertNotNull(appTimeoutsInfo);
List<AppTimeoutInfo> timeouts = appTimeoutsInfo.getAppTimeouts();
assertNotNull(timeouts);
assertEquals(1, timeouts.size());
AppTimeoutInfo resultAppTimeout = timeouts.get(0);
assertNotNull(resultAppTimeout);
assertEquals(10, resultAppTimeout.getRemainingTimeInSec());
assertEquals("UNLIMITED", resultAppTimeout.getExpireTime());
assertEquals(ApplicationTimeoutType.LIFETIME, resultAppTimeout.getTimeoutType());
}
@Test
public void testUpdateApplicationTimeout() throws IOException, InterruptedException,
YarnException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
assertNotNull(interceptor.submitApplication(context, null));
long newLifetime = 10L;
// update 10L seconds more to timeout
String timeout = Times.formatISO8601(Time.now() + newLifetime * 1000);
AppTimeoutInfo paramAppTimeOut = new AppTimeoutInfo();
paramAppTimeOut.setExpiryTime(timeout);
// RemainingTime = Math.max((timeoutInMillis - System.currentTimeMillis()) / 1000, 0))
paramAppTimeOut.setRemainingTime(newLifetime);
paramAppTimeOut.setTimeoutType(ApplicationTimeoutType.LIFETIME);
Response response =
interceptor.updateApplicationTimeout(paramAppTimeOut, null, appId.toString());
assertNotNull(response);
AppTimeoutInfo entity = (AppTimeoutInfo) response.getEntity();
assertNotNull(entity);
assertEquals(paramAppTimeOut.getExpireTime(), entity.getExpireTime());
assertEquals(paramAppTimeOut.getTimeoutType(), entity.getTimeoutType());
assertEquals(paramAppTimeOut.getRemainingTimeInSec(), entity.getRemainingTimeInSec());
}
@Test
public void testUpdateApplicationPriority() throws IOException, InterruptedException,
YarnException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setPriority(20);
// Submit the application we are going to kill later
assertNotNull(interceptor.submitApplication(context, null));
int iPriority = 10;
// Set Priority for application
Response response = interceptor.updateApplicationPriority(
new AppPriority(iPriority), null, appId.toString());
assertNotNull(response);
AppPriority entity = (AppPriority) response.getEntity();
assertNotNull(entity);
assertEquals(iPriority, entity.getPriority());
}
@Test
public void testGetAppPriority() throws IOException, InterruptedException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
int priority = 40;
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setPriority(priority);
// Submit the application we are going to kill later
assertNotNull(interceptor.submitApplication(context, null));
// Set Priority for application
AppPriority appPriority = interceptor.getAppPriority(null, appId.toString());
assertNotNull(appPriority);
assertEquals(priority, appPriority.getPriority());
}
@Test
public void testUpdateAppQueue() throws IOException, InterruptedException,
YarnException {
String oldQueue = "oldQueue";
String newQueue = "newQueue";
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setQueue(oldQueue);
// Submit the application
assertNotNull(interceptor.submitApplication(context, null));
// Set New Queue for application
Response response = interceptor.updateAppQueue(new AppQueue(newQueue),
null, appId.toString());
assertNotNull(response);
AppQueue appQueue = (AppQueue) response.getEntity();
assertEquals(newQueue, appQueue.getQueue());
// Get AppQueue by application
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
assertNotNull(queue);
assertEquals(newQueue, queue.getQueue());
}
@Test
public void testGetAppQueue() throws IOException, InterruptedException {
String queueName = "queueName";
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setQueue(queueName);
assertNotNull(interceptor.submitApplication(context, null));
// Get Queue by application
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
assertNotNull(queue);
assertEquals(queueName, queue.getQueue());
}
@Test
public void testGetAppsInfoCache() {
AppsInfo responseGet = interceptor.getApps(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
assertNotNull(responseGet);
RouterAppInfoCacheKey cacheKey = RouterAppInfoCacheKey.newInstance(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appsInfoCache =
interceptor.getAppInfosCaches();
assertNotNull(appsInfoCache);
assertFalse(appsInfoCache.isEmpty());
assertEquals(1, appsInfoCache.size());
assertTrue(appsInfoCache.containsKey(cacheKey));
AppsInfo cacheResult = appsInfoCache.get(cacheKey);
assertNotNull(cacheResult);
assertEquals(responseGet, cacheResult);
}
@Test
public void testGetAppStatistics() throws IOException, InterruptedException, YarnException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setApplicationType("MapReduce");
context.setQueue("queue");
assertNotNull(interceptor.submitApplication(context, null));
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
assertNotNull(response);
ApplicationHomeSubCluster homeSubCluster = response.getApplicationHomeSubCluster();
DefaultRequestInterceptorREST interceptorREST =
interceptor.getInterceptorForSubCluster(homeSubCluster.getHomeSubCluster());
MockDefaultRequestInterceptorREST mockInterceptorREST =
(MockDefaultRequestInterceptorREST) interceptorREST;
mockInterceptorREST.updateApplicationState(YarnApplicationState.RUNNING,
appId.toString());
Set<String> stateQueries = new HashSet<>();
stateQueries.add(YarnApplicationState.RUNNING.name());
Set<String> typeQueries = new HashSet<>();
typeQueries.add("MapReduce");
ApplicationStatisticsInfo response2 =
interceptor.getAppStatistics(null, stateQueries, typeQueries);
assertNotNull(response2);
assertFalse(response2.getStatItems().isEmpty());
StatisticsItemInfo result = response2.getStatItems().get(0);
assertEquals(1, result.getCount());
assertEquals(YarnApplicationState.RUNNING, result.getState());
assertEquals("MapReduce", result.getType());
}
@Test
public void testGetAppActivities() throws IOException, InterruptedException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setApplicationType("MapReduce");
context.setQueue("queue");
assertNotNull(interceptor.submitApplication(context, null));
Set<String> prioritiesSet = Collections.singleton("0");
Set<String> allocationRequestIdsSet = Collections.singleton("0");
AppActivitiesInfo appActivitiesInfo =
interceptor.getAppActivities(null, appId.toString(), String.valueOf(Time.now()),
prioritiesSet, allocationRequestIdsSet, null, "-1", null, false);
assertNotNull(appActivitiesInfo);
assertEquals(appId.toString(), appActivitiesInfo.getApplicationId());
assertEquals(10, appActivitiesInfo.getAllocations().size());
}
@Test
public void testListReservation() throws Exception {
// submitReservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
submitReservation(reservationId);
// Call the listReservation method
String applyReservationId = reservationId.toString();
Response listReservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
assertNotNull(listReservationResponse);
assertNotNull(listReservationResponse.getStatus());
Status status = Status.fromStatusCode(listReservationResponse.getStatus());
assertEquals(Status.OK, status);
Object entity = listReservationResponse.getEntity();
assertNotNull(entity);
assertNotNull(entity instanceof ReservationListInfo);
assertTrue(entity instanceof ReservationListInfo);
ReservationListInfo listInfo = (ReservationListInfo) entity;
assertNotNull(listInfo);
List<ReservationInfo> reservationInfoList = listInfo.getReservations();
assertNotNull(reservationInfoList);
assertEquals(1, reservationInfoList.size());
ReservationInfo reservationInfo = reservationInfoList.get(0);
assertNotNull(reservationInfo);
assertEquals(applyReservationId, reservationInfo.getReservationId());
ReservationDefinitionInfo definitionInfo = reservationInfo.getReservationDefinition();
assertNotNull(definitionInfo);
ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();
assertNotNull(reservationRequestsInfo);
ArrayList<ReservationRequestInfo> reservationRequestInfoList =
reservationRequestsInfo.getReservationRequest();
assertNotNull(reservationRequestInfoList);
assertEquals(1, reservationRequestInfoList.size());
ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0);
assertNotNull(reservationRequestInfo);
assertEquals(4, reservationRequestInfo.getNumContainers());
ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
assertNotNull(resourceInfo);
int vCore = resourceInfo.getvCores();
long memory = resourceInfo.getMemorySize();
assertEquals(1, vCore);
assertEquals(1024, memory);
}
@Test
public void testCreateNewReservation() throws Exception {
Response response = interceptor.createNewReservation(null);
assertNotNull(response);
Object entity = response.getEntity();
assertNotNull(entity);
assertTrue(entity instanceof NewReservation);
NewReservation newReservation = (NewReservation) entity;
assertNotNull(newReservation);
assertTrue(newReservation.getReservationId().contains("reservation"));
}
@Test
public void testSubmitReservation() throws Exception {
// submit reservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
Response response = submitReservation(reservationId);
assertNotNull(response);
assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
String applyReservationId = reservationId.toString();
Response reservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
assertNotNull(reservationResponse);
Object entity = reservationResponse.getEntity();
assertNotNull(entity);
assertNotNull(entity instanceof ReservationListInfo);
assertTrue(entity instanceof ReservationListInfo);
ReservationListInfo listInfo = (ReservationListInfo) entity;
assertNotNull(listInfo);
List<ReservationInfo> reservationInfos = listInfo.getReservations();
assertNotNull(reservationInfos);
assertEquals(1, reservationInfos.size());
ReservationInfo reservationInfo = reservationInfos.get(0);
assertNotNull(reservationInfo);
assertEquals(reservationInfo.getReservationId(), applyReservationId);
}
@Test
public void testUpdateReservation() throws Exception {
// submit reservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
Response response = submitReservation(reservationId);
assertNotNull(response);
assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
// update reservation
ReservationSubmissionRequest resSubRequest =
getReservationSubmissionRequest(reservationId, 6, 2048, 2);
ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition();
ReservationDefinitionInfo reservationDefinitionInfo =
new ReservationDefinitionInfo(reservationDefinition);
ReservationUpdateRequestInfo updateRequestInfo = new ReservationUpdateRequestInfo();
updateRequestInfo.setReservationId(reservationId.toString());
updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
Response updateReservationResp = interceptor.updateReservation(updateRequestInfo, null);
assertNotNull(updateReservationResp);
assertEquals(Status.OK.getStatusCode(), updateReservationResp.getStatus());
String applyReservationId = reservationId.toString();
Response reservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
assertNotNull(reservationResponse);
Object entity = reservationResponse.getEntity();
assertNotNull(entity);
assertNotNull(entity instanceof ReservationListInfo);
assertTrue(entity instanceof ReservationListInfo);
ReservationListInfo listInfo = (ReservationListInfo) entity;
assertNotNull(listInfo);
List<ReservationInfo> reservationInfos = listInfo.getReservations();
assertNotNull(reservationInfos);
assertEquals(1, reservationInfos.size());
ReservationInfo reservationInfo = reservationInfos.get(0);
assertNotNull(reservationInfo);
assertEquals(reservationInfo.getReservationId(), applyReservationId);
ReservationDefinitionInfo resDefinitionInfo = reservationInfo.getReservationDefinition();
assertNotNull(resDefinitionInfo);
ReservationRequestsInfo reservationRequestsInfo = resDefinitionInfo.getReservationRequests();
assertNotNull(reservationRequestsInfo);
ArrayList<ReservationRequestInfo> reservationRequestInfoList =
reservationRequestsInfo.getReservationRequest();
assertNotNull(reservationRequestInfoList);
assertEquals(1, reservationRequestInfoList.size());
ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0);
assertNotNull(reservationRequestInfo);
assertEquals(6, reservationRequestInfo.getNumContainers());
ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
assertNotNull(resourceInfo);
int vCore = resourceInfo.getvCores();
long memory = resourceInfo.getMemorySize();
assertEquals(2, vCore);
assertEquals(2048, memory);
}
@Test
public void testDeleteReservation() throws Exception {
// submit reservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
Response response = submitReservation(reservationId);
assertNotNull(response);
assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
String applyResId = reservationId.toString();
Response reservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
assertNotNull(reservationResponse);
ReservationDeleteRequestInfo deleteRequestInfo =
new ReservationDeleteRequestInfo();
deleteRequestInfo.setReservationId(applyResId);
Response delResponse = interceptor.deleteReservation(deleteRequestInfo, null);
assertNotNull(delResponse);
NotFoundException exception = assertThrows(NotFoundException.class, () -> {
interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
});
String stackTraceAsString = getStackTraceAsString(exception);
assertTrue(stackTraceAsString.contains("reservationId with id: " +
reservationId + " not found"));
}
private Response submitReservation(ReservationId reservationId)
throws IOException, InterruptedException {
ReservationSubmissionRequestInfo resSubmissionRequestInfo =
getReservationSubmissionRequestInfo(reservationId);
return interceptor.submitReservation(resSubmissionRequestInfo, null);
}
public static ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
ReservationId reservationId) {
ReservationSubmissionRequest resSubRequest =
getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 1);
ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition();
ReservationSubmissionRequestInfo resSubmissionRequestInfo =
new ReservationSubmissionRequestInfo();
resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
resSubmissionRequestInfo.setReservationId(reservationId.toString());
ReservationDefinitionInfo reservationDefinitionInfo =
new ReservationDefinitionInfo(reservationDefinition);
resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
return resSubmissionRequestInfo;
}
public static ReservationSubmissionRequest getReservationSubmissionRequest(
ReservationId reservationId, int numContainers, int memory, int vcore) {
// arrival time from which the resource(s) can be allocated.
long arrival = Time.now();
// deadline by when the resource(s) must be allocated.
// The reason for choosing 1.05 is that this gives an integer
// DURATION * 0.05 = 3000(ms)
// deadline = arrival + 3000ms
long deadline = (long) (arrival + 1.05 * DURATION);
return createSimpleReservationRequest(
reservationId, numContainers, arrival, deadline, DURATION, memory, vcore);
}
public static ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration, int memory, int vcore) {
// create a request with a single atomic ask
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(memory, vcore), numContainers, 1, duration);
ReservationRequests reqs = ReservationRequests.newInstance(
Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
ReservationDefinition rDef = ReservationDefinition.newInstance(
arrival, deadline, reqs, "testClientRMService#reservation", "0", Priority.UNDEFINED);
return ReservationSubmissionRequest.newInstance(rDef, QUEUE_DEDICATED_FULL, reservationId);
}
@Test
public void testWebAddressWithScheme() {
// The style of the web address reported by the subCluster in the heartbeat is 0.0.0.0:8000
// We design the following 2 test cases:
String webAppAddress = "0.0.0.0:8000";
// 1. We try to disable Https, at this point we should get the following link:
// http://0.0.0.0:8000
String expectedHttpWebAddress = "http://0.0.0.0:8000";
String webAppAddressWithScheme =
WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress;
assertEquals(expectedHttpWebAddress, webAppAddressWithScheme);
// 2. We try to enable Https���at this point we should get the following link:
// https://0.0.0.0:8000
Configuration configuration = this.getConf();
configuration.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTPS_ONLY.name());
String expectedHttpsWebAddress = "https://0.0.0.0:8000";
String webAppAddressWithScheme2 =
WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress;
assertEquals(expectedHttpsWebAddress, webAppAddressWithScheme2);
}
@Test
public void testCheckUserAccessToQueue() throws Exception {
interceptor.setAllowPartialResult(false);
// Case 1: Only queue admin user can access other user's information
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin");
String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " +
"so it cannot check ACLs for other users.";
RuntimeException exception = assertThrows(RuntimeException.class, () -> {
interceptor.checkUserAccessToQueue("queue", "jack",
QueueACL.SUBMIT_APPLICATIONS.name(), mockHsr);
});
String stackTraceAsString = getStackTraceAsString(exception);
assertTrue(stackTraceAsString.contains(errorMsg1));
// Case 2: request an unknown ACL causes BAD_REQUEST
HttpServletRequest mockHsr1 = mockHttpServletRequestByUserName("admin");
String errorMsg2 = "Specified queueAclType=XYZ_ACL is not a valid type, " +
"valid queue acl types={SUBMIT_APPLICATIONS/ADMINISTER_QUEUE}";
RuntimeException exception2 = assertThrows(RuntimeException.class, () -> {
interceptor.checkUserAccessToQueue("queue", "jack",
"XYZ_ACL", mockHsr1);
});
String stackTraceAsString2 = getStackTraceAsString(exception2);
assertTrue(stackTraceAsString2.contains(errorMsg2));
// We design a test, admin user has ADMINISTER_QUEUE, SUBMIT_APPLICATIONS permissions,
// yarn user has SUBMIT_APPLICATIONS permissions, other users have no permissions
// Case 3: get FORBIDDEN for rejected ACL
checkUserAccessToQueueFailed("queue", "jack", QueueACL.SUBMIT_APPLICATIONS, "admin");
checkUserAccessToQueueFailed("queue", "jack", QueueACL.ADMINISTER_QUEUE, "admin");
// Case 4: get OK for listed ACLs
checkUserAccessToQueueSuccess("queue", "admin", QueueACL.ADMINISTER_QUEUE, "admin");
checkUserAccessToQueueSuccess("queue", "admin", QueueACL.SUBMIT_APPLICATIONS, "admin");
// Case 5: get OK only for SUBMIT_APP acl for "yarn" user
checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin");
checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin");
interceptor.setAllowPartialResult(true);
}
private void checkUserAccessToQueueSuccess(String queue, String userName,
QueueACL queueACL, String mockUser) throws AuthorizationException {
HttpServletRequest mockHsr = mockHttpServletRequestByUserName(mockUser);
RMQueueAclInfo aclInfo =
interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr);
assertNotNull(aclInfo);
assertTrue(aclInfo instanceof FederationRMQueueAclInfo);
FederationRMQueueAclInfo fedAclInfo = (FederationRMQueueAclInfo) aclInfo;
List<RMQueueAclInfo> aclInfos = fedAclInfo.getList();
assertNotNull(aclInfos);
assertEquals(4, aclInfos.size());
for (RMQueueAclInfo rMQueueAclInfo : aclInfos) {
assertTrue(rMQueueAclInfo.isAllowed());
}
}
private void checkUserAccessToQueueFailed(String queue, String userName,
QueueACL queueACL, String mockUser) throws AuthorizationException {
HttpServletRequest mockHsr = mockHttpServletRequestByUserName(mockUser);
RMQueueAclInfo aclInfo =
interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr);
assertNotNull(aclInfo);
assertTrue(aclInfo instanceof FederationRMQueueAclInfo);
FederationRMQueueAclInfo fedAclInfo = (FederationRMQueueAclInfo) aclInfo;
List<RMQueueAclInfo> aclInfos = fedAclInfo.getList();
assertNotNull(aclInfos);
assertEquals(4, aclInfos.size());
for (RMQueueAclInfo rMQueueAclInfo : aclInfos) {
assertFalse(rMQueueAclInfo.isAllowed());
String expectDiagnostics = "User=" + userName +
" doesn't have access to queue=queue with acl-type=" + queueACL.name();
assertEquals(expectDiagnostics, rMQueueAclInfo.getDiagnostics());
}
}
private HttpServletRequest mockHttpServletRequestByUserName(String username) {
HttpServletRequest mockHsr = mock(HttpServletRequest.class);
when(mockHsr.getRemoteUser()).thenReturn(username);
Principal principal = mock(Principal.class);
when(principal.getName()).thenReturn(username);
when(mockHsr.getUserPrincipal()).thenReturn(principal);
return mockHsr;
}
@Test
public void testCheckFederationInterceptorRESTClient() {
SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
String webAppSocket = "SC-1:WebAddress";
String webAppAddress = "http://" + webAppSocket;
Configuration configuration = new Configuration();
FederationInterceptorREST rest = new FederationInterceptorREST();
rest.setConf(configuration);
rest.init("router");
DefaultRequestInterceptorREST interceptorREST =
rest.getOrCreateInterceptorForSubCluster(subClusterId, webAppSocket);
assertNotNull(interceptorREST);
assertNotNull(interceptorREST.getClient());
assertEquals(webAppAddress, interceptorREST.getWebAppAddress());
}
@Test
public void testInvokeConcurrent() throws IOException, YarnException {
// We design such a test case, we call the interceptor's getNodes interface,
// this interface will generate the following test data
// subCluster0 Node 0
// subCluster1 Node 1
// subCluster2 Node 2
// subCluster3 Node 3
// We use the returned data to verify whether the subClusterId
// of the multi-thread call can match the node data
Map<SubClusterInfo, NodesInfo> subClusterInfoNodesInfoMap =
interceptor.invokeConcurrentGetNodeLabel();
assertNotNull(subClusterInfoNodesInfoMap);
assertEquals(4, subClusterInfoNodesInfoMap.size());
subClusterInfoNodesInfoMap.forEach((subClusterInfo, nodesInfo) -> {
String subClusterId = subClusterInfo.getSubClusterId().getId();
List<NodeInfo> nodeInfos = nodesInfo.getNodes();
assertNotNull(nodeInfos);
assertEquals(1, nodeInfos.size());
String expectNodeId = "Node " + subClusterId;
String nodeId = nodeInfos.get(0).getNodeId();
assertEquals(expectNodeId, nodeId);
});
}
@Test
public void testGetSchedulerInfo() {
// In this test case, we will get the return results of 4 sub-clusters.
SchedulerTypeInfo typeInfo = interceptor.getSchedulerInfo();
assertNotNull(typeInfo);
assertTrue(typeInfo instanceof FederationSchedulerTypeInfo);
FederationSchedulerTypeInfo federationSchedulerTypeInfo =
(FederationSchedulerTypeInfo) typeInfo;
assertNotNull(federationSchedulerTypeInfo);
List<SchedulerTypeInfo> schedulerTypeInfos = federationSchedulerTypeInfo.getList();
assertNotNull(schedulerTypeInfos);
assertEquals(4, schedulerTypeInfos.size());
List<String> subClusterIds = subClusters.stream().map(SubClusterId::getId).
collect(Collectors.toList());
for (SchedulerTypeInfo schedulerTypeInfo : schedulerTypeInfos) {
assertNotNull(schedulerTypeInfo);
// 1. Whether the returned subClusterId is in the subCluster list
String subClusterId = schedulerTypeInfo.getSubClusterId();
assertTrue(subClusterIds.contains(subClusterId));
// 2. We test CapacityScheduler, the returned type should be CapacityScheduler.
SchedulerInfo schedulerInfo = schedulerTypeInfo.getSchedulerInfo();
assertNotNull(schedulerInfo);
assertTrue(schedulerInfo instanceof CapacitySchedulerInfo);
CapacitySchedulerInfo capacitySchedulerInfo = (CapacitySchedulerInfo) schedulerInfo;
assertNotNull(capacitySchedulerInfo);
// 3. The parent queue name should be root
String queueName = capacitySchedulerInfo.getQueueName();
assertEquals("root", queueName);
// 4. schedulerType should be CapacityScheduler
String schedulerType = capacitySchedulerInfo.getSchedulerType();
assertEquals("Capacity Scheduler", schedulerType);
// 5. queue path should be root
String queuePath = capacitySchedulerInfo.getQueuePath();
assertEquals("root", queuePath);
// 6. mockRM has 2 test queues, [root.a, root.b]
List<String> queues = Lists.newArrayList("root.a", "root.b");
CapacitySchedulerQueueInfoList csSchedulerQueueInfoList = capacitySchedulerInfo.getQueues();
assertNotNull(csSchedulerQueueInfoList);
List<CapacitySchedulerQueueInfo> csQueueInfoList =
csSchedulerQueueInfoList.getQueueInfoList();
assertEquals(2, csQueueInfoList.size());
for (CapacitySchedulerQueueInfo csQueueInfo : csQueueInfoList) {
assertNotNull(csQueueInfo);
assertTrue(queues.contains(csQueueInfo.getQueuePath()));
}
}
}
@Test
public void testPostDelegationTokenErrorHsr() throws Exception {
// Prepare delegationToken data
DelegationToken token = new DelegationToken();
token.setRenewer(TEST_RENEWER);
HttpServletRequest request = mock(HttpServletRequest.class);
// If we don't set token
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the tokenData or hsr is null.",
() -> interceptor.postDelegationToken(null, request));
// If we don't set hsr
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the tokenData or hsr is null.",
() -> interceptor.postDelegationToken(token, null));
// If we don't set renewUser, we will get error message.
LambdaTestUtils.intercept(AuthorizationException.class,
"Unable to obtain user name, user not authenticated",
() -> interceptor.postDelegationToken(token, request));
Principal principal = mock(Principal.class);
when(principal.getName()).thenReturn(TEST_RENEWER);
when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
when(request.getUserPrincipal()).thenReturn(principal);
// If we don't set the authentication type, we will get error message.
Response response = interceptor.postDelegationToken(token, request);
assertNotNull(response);
assertEquals(response.getStatus(), Status.FORBIDDEN.getStatusCode());
String errMsg = "Delegation token operations can only be carried out on a " +
"Kerberos authenticated channel. Expected auth type is kerberos, got type null";
Object entity = response.getEntity();
assertNotNull(entity);
assertTrue(entity instanceof String);
String entityMsg = String.valueOf(entity);
assertTrue(errMsg.contains(entityMsg));
}
@Test
public void testPostDelegationToken() throws Exception {
Long now = Time.now();
DelegationToken token = new DelegationToken();
token.setRenewer(TEST_RENEWER);
Principal principal = mock(Principal.class);
when(principal.getName()).thenReturn(TEST_RENEWER);
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
when(request.getUserPrincipal()).thenReturn(principal);
when(request.getAuthType()).thenReturn("kerberos");
Response response = interceptor.postDelegationToken(token, request);
assertNotNull(response);
Object entity = response.getEntity();
assertNotNull(entity);
assertTrue(entity instanceof DelegationToken);
DelegationToken dtoken = (DelegationToken) entity;
assertEquals(TEST_RENEWER, dtoken.getRenewer());
assertEquals(TEST_RENEWER, dtoken.getOwner());
assertEquals("RM_DELEGATION_TOKEN", dtoken.getKind());
assertNotNull(dtoken.getToken());
assertTrue(dtoken.getNextExpirationTime() > now);
}
@Test
public void testPostDelegationTokenExpirationError() throws Exception {
// If we don't set hsr
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the hsr is null.",
() -> interceptor.postDelegationTokenExpiration(null));
Principal principal = mock(Principal.class);
when(principal.getName()).thenReturn(TEST_RENEWER);
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
when(request.getUserPrincipal()).thenReturn(principal);
when(request.getAuthType()).thenReturn("kerberos");
// If we don't set the header.
String errorMsg = "Header 'Hadoop-YARN-RM-Delegation-Token' containing encoded token not found";
BadRequestException badRequestException = assertThrows(BadRequestException.class, () -> {
interceptor.postDelegationTokenExpiration(request);
});
String stackTraceAsString = getStackTraceAsString(badRequestException);
assertTrue(stackTraceAsString.contains(errorMsg));
}
@Test
public void testPostDelegationTokenExpiration() throws Exception {
DelegationToken token = new DelegationToken();
token.setRenewer(TEST_RENEWER);
Principal principal = mock(Principal.class);
when(principal.getName()).thenReturn(TEST_RENEWER);
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
when(request.getUserPrincipal()).thenReturn(principal);
when(request.getAuthType()).thenReturn("kerberos");
Response response = interceptor.postDelegationToken(token, request);
assertNotNull(response);
Object entity = response.getEntity();
assertNotNull(entity);
assertTrue(entity instanceof DelegationToken);
DelegationToken dtoken = (DelegationToken) entity;
final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token";
when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken());
Response renewResponse = interceptor.postDelegationTokenExpiration(request);
assertNotNull(renewResponse);
Object renewEntity = renewResponse.getEntity();
assertNotNull(renewEntity);
assertTrue(renewEntity instanceof DelegationToken);
// renewDelegation, we only return renewDate, other values are NULL.
DelegationToken renewDToken = (DelegationToken) renewEntity;
assertNull(renewDToken.getRenewer());
assertNull(renewDToken.getOwner());
assertNull(renewDToken.getKind());
assertTrue(renewDToken.getNextExpirationTime() > dtoken.getNextExpirationTime());
}
@Test
public void testCancelDelegationToken() throws Exception {
DelegationToken token = new DelegationToken();
token.setRenewer(TEST_RENEWER);
Principal principal = mock(Principal.class);
when(principal.getName()).thenReturn(TEST_RENEWER);
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
when(request.getUserPrincipal()).thenReturn(principal);
when(request.getAuthType()).thenReturn("kerberos");
Response response = interceptor.postDelegationToken(token, request);
assertNotNull(response);
Object entity = response.getEntity();
assertNotNull(entity);
assertTrue(entity instanceof DelegationToken);
DelegationToken dtoken = (DelegationToken) entity;
final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token";
when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken());
Response cancelResponse = interceptor.cancelDelegationToken(request);
assertNotNull(cancelResponse);
assertEquals(response.getStatus(), Status.OK.getStatusCode());
}
@Test
public void testReplaceLabelsOnNodes() throws Exception {
// subCluster0 -> node0:0 -> label:NodeLabel0
// subCluster1 -> node1:1 -> label:NodeLabel1
// subCluster2 -> node2:2 -> label:NodeLabel2
// subCluster3 -> node3:3 -> label:NodeLabel3
NodeToLabelsEntryList nodeToLabelsEntryList = new NodeToLabelsEntryList();
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
// labels
List<String> labels = new ArrayList<>();
labels.add("NodeLabel" + i);
// nodes
String nodeId = "node" + i + ":" + i;
NodeToLabelsEntry nodeToLabelsEntry = new NodeToLabelsEntry(nodeId, labels);
List<NodeToLabelsEntry> nodeToLabelsEntries = nodeToLabelsEntryList.getNodeToLabels();
nodeToLabelsEntries.add(nodeToLabelsEntry);
}
// one of the results:
// subCluster#0:Success;subCluster#1:Success;subCluster#3:Success;subCluster#2:Success;
// We can't confirm the complete return order.
Response response = interceptor.replaceLabelsOnNodes(nodeToLabelsEntryList, null);
assertNotNull(response);
assertEquals(200, response.getStatus());
Object entityObject = response.getEntity();
assertNotNull(entityObject);
String entityValue = String.valueOf(entityObject);
String[] entities = entityValue.split(",");
assertNotNull(entities);
assertEquals(4, entities.length);
String expectValue =
"subCluster-0:Success,subCluster-1:Success,subCluster-2:Success,subCluster-3:Success,";
for (String entity : entities) {
assertTrue(expectValue.contains(entity));
}
}
@Test
public void testReplaceLabelsOnNodesError() throws Exception {
// newNodeToLabels is null
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, newNodeToLabels must not be empty.",
() -> interceptor.replaceLabelsOnNodes(null, null));
// nodeToLabelsEntryList is Empty
NodeToLabelsEntryList nodeToLabelsEntryList = new NodeToLabelsEntryList();
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, nodeToLabelsEntries must not be empty.",
() -> interceptor.replaceLabelsOnNodes(nodeToLabelsEntryList, null));
}
@Test
@Order(1)
public void testReplaceLabelsOnNode() throws Exception {
// subCluster3 -> node3:3 -> label:NodeLabel3
String nodeId = "node3:3";
Set<String> labels = Collections.singleton("NodeLabel3");
// We expect the following result: subCluster#3:Success;
String expectValue = "subCluster#3:Success;";
Response response = interceptor.replaceLabelsOnNode(labels, null, nodeId);
assertNotNull(response);
assertEquals(200, response.getStatus());
Object entityObject = response.getEntity();
assertNotNull(entityObject);
String entityValue = String.valueOf(entityObject);
assertNotNull(entityValue);
assertEquals(expectValue, entityValue);
}
@Test
public void testReplaceLabelsOnNodeError() throws Exception {
// newNodeToLabels is null
String nodeId = "node3:3";
Set<String> labels = Collections.singleton("NodeLabel3");
Set<String> labelsEmpty = new HashSet<>();
// nodeId is null
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, nodeId must not be null or empty.",
() -> interceptor.replaceLabelsOnNode(labels, null, null));
// labels is null
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, newNodeLabelsName must not be empty.",
() -> interceptor.replaceLabelsOnNode(null, null, nodeId));
// labels is empty
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, newNodeLabelsName must not be empty.",
() -> interceptor.replaceLabelsOnNode(labelsEmpty, null, nodeId));
}
@Test
public void testDumpSchedulerLogs() throws Exception {
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("admin");
String dumpSchedulerLogsMsg = interceptor.dumpSchedulerLogs("1", mockHsr);
// We cannot guarantee the calling order of the sub-clusters,
// We guarantee that the returned result contains the information of each subCluster.
assertNotNull(dumpSchedulerLogsMsg);
subClusters.forEach(subClusterId -> {
String subClusterMsg =
"subClusterId" + subClusterId + " : Capacity scheduler logs are being created.; ";
assertTrue(dumpSchedulerLogsMsg.contains(subClusterMsg));
});
}
@Test
public void testDumpSchedulerLogsError() throws Exception {
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("admin");
// time is empty
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the time is empty or null.",
() -> interceptor.dumpSchedulerLogs(null, mockHsr));
// time is negative
LambdaTestUtils.intercept(IllegalArgumentException.class,
"time must be greater than 0.",
() -> interceptor.dumpSchedulerLogs("-1", mockHsr));
// time is non-numeric
LambdaTestUtils.intercept(IllegalArgumentException.class,
"time must be a number.",
() -> interceptor.dumpSchedulerLogs("abc", mockHsr));
}
@Test
public void testGetActivitiesNormal() {
ActivitiesInfo activitiesInfo = interceptor.getActivities(null, "1", "DIAGNOSTIC");
assertNotNull(activitiesInfo);
String nodeId = activitiesInfo.getNodeId();
assertNotNull(nodeId);
assertEquals("1", nodeId);
String diagnostic = activitiesInfo.getDiagnostic();
assertNotNull(diagnostic);
assertTrue(StringUtils.contains(diagnostic, "Diagnostic"));
long timestamp = activitiesInfo.getTimestamp();
assertEquals(1673081972L, timestamp);
List<NodeAllocationInfo> allocationInfos = activitiesInfo.getAllocations();
assertNotNull(allocationInfos);
assertEquals(1, allocationInfos.size());
}
@Test
public void testGetActivitiesError() throws Exception {
// nodeId is empty
LambdaTestUtils.intercept(IllegalArgumentException.class,
"'nodeId' must not be empty.",
() -> interceptor.getActivities(null, "", "DIAGNOSTIC"));
// groupBy is empty
LambdaTestUtils.intercept(IllegalArgumentException.class,
"'groupBy' must not be empty.",
() -> interceptor.getActivities(null, "1", ""));
// groupBy value is wrong
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Got invalid groupBy: TEST1, valid groupBy types: [DIAGNOSTIC]",
() -> interceptor.getActivities(null, "1", "TEST1"));
}
@Test
public void testGetBulkActivitiesNormal() throws InterruptedException {
BulkActivitiesInfo bulkActivitiesInfo =
interceptor.getBulkActivities(null, "DIAGNOSTIC", 5);
assertNotNull(bulkActivitiesInfo);
assertTrue(bulkActivitiesInfo instanceof FederationBulkActivitiesInfo);
FederationBulkActivitiesInfo federationBulkActivitiesInfo =
(FederationBulkActivitiesInfo) bulkActivitiesInfo;
assertNotNull(federationBulkActivitiesInfo);
List<BulkActivitiesInfo> activitiesInfos = federationBulkActivitiesInfo.getList();
assertNotNull(activitiesInfos);
assertEquals(4, activitiesInfos.size());
for (BulkActivitiesInfo activitiesInfo : activitiesInfos) {
assertNotNull(activitiesInfo);
List<ActivitiesInfo> activitiesInfoList = activitiesInfo.getActivities();
assertNotNull(activitiesInfoList);
assertEquals(5, activitiesInfoList.size());
}
}
@Test
public void testGetBulkActivitiesError() throws Exception {
// activitiesCount < 0
LambdaTestUtils.intercept(IllegalArgumentException.class,
"'activitiesCount' must not be negative.",
() -> interceptor.getBulkActivities(null, "DIAGNOSTIC", -1));
// groupBy value is wrong
LambdaTestUtils.intercept(YarnRuntimeException.class,
"Got invalid groupBy: TEST1, valid groupBy types: [DIAGNOSTIC]",
() -> interceptor.getBulkActivities(null, "TEST1", 1));
// groupBy is empty
LambdaTestUtils.intercept(IllegalArgumentException.class,
"'groupBy' must not be empty.",
() -> interceptor.getBulkActivities(null, "", 1));
}
@Test
public void testAddToClusterNodeLabels1() throws Exception {
// In this test, we try to add ALL label, all subClusters will return success.
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
NodeLabelInfo nodeLabelInfo = new NodeLabelInfo("ALL", true);
nodeLabelsInfo.getNodeLabelsInfo().add(nodeLabelInfo);
Response response = interceptor.addToClusterNodeLabels(nodeLabelsInfo, null);
assertNotNull(response);
Object entityObj = response.getEntity();
assertNotNull(entityObj);
String entity = String.valueOf(entityObj);
String[] entities = StringUtils.split(entity, ",");
assertNotNull(entities);
assertEquals(4, entities.length);
// The order in which the cluster returns messages is uncertain,
// we confirm the result by contains
String expectedMsg =
"SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS";
Arrays.stream(entities).forEach(item -> assertTrue(expectedMsg.contains(item)));
}
@Test
public void testAddToClusterNodeLabels2() throws Exception {
// In this test, we try to add A0 label,
// subCluster0 will return success, and other subClusters will return null
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
NodeLabelInfo nodeLabelInfo = new NodeLabelInfo("A0", true);
nodeLabelsInfo.getNodeLabelsInfo().add(nodeLabelInfo);
Response response = interceptor.addToClusterNodeLabels(nodeLabelsInfo, null);
assertNotNull(response);
Object entityObj = response.getEntity();
assertNotNull(entityObj);
String expectedValue = "SubCluster-0:SUCCESS,";
String entity = String.valueOf(entityObj);
assertTrue(entity.contains(expectedValue));
}
@Test
public void testAddToClusterNodeLabelsError() throws Exception {
// the newNodeLabels is null
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the newNodeLabels is null.",
() -> interceptor.addToClusterNodeLabels(null, null));
// the nodeLabelsInfo is null
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the nodeLabelsInfo is null or empty.",
() -> interceptor.addToClusterNodeLabels(nodeLabelsInfo, null));
// error nodeLabelsInfo
NodeLabelsInfo nodeLabelsInfo1 = new NodeLabelsInfo();
NodeLabelInfo nodeLabelInfo1 = new NodeLabelInfo("A", true);
nodeLabelsInfo1.getNodeLabelsInfo().add(nodeLabelInfo1);
LambdaTestUtils.intercept(YarnRuntimeException.class, "addToClusterNodeLabels Error",
() -> interceptor.addToClusterNodeLabels(nodeLabelsInfo1, null));
}
@Test
public void testRemoveFromClusterNodeLabels1() throws Exception {
Set<String> oldNodeLabels = Sets.newHashSet();
oldNodeLabels.add("ALL");
Response response = interceptor.removeFromClusterNodeLabels(oldNodeLabels, null);
assertNotNull(response);
Object entityObj = response.getEntity();
assertNotNull(entityObj);
String entity = String.valueOf(entityObj);
String[] entities = StringUtils.split(entity, ",");
assertNotNull(entities);
assertEquals(4, entities.length);
// The order in which the cluster returns messages is uncertain,
// we confirm the result by contains
String expectedMsg =
"SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS";
Arrays.stream(entities).forEach(item -> assertTrue(expectedMsg.contains(item)));
}
@Test
public void testRemoveFromClusterNodeLabels2() throws Exception {
Set<String> oldNodeLabels = Sets.newHashSet();
oldNodeLabels.add("A0");
Response response = interceptor.removeFromClusterNodeLabels(oldNodeLabels, null);
assertNotNull(response);
Object entityObj = response.getEntity();
assertNotNull(entityObj);
String expectedValue = "SubCluster-0:SUCCESS,";
String entity = String.valueOf(entityObj);
assertTrue(entity.contains(expectedValue));
}
@Test
public void testRemoveFromClusterNodeLabelsError() throws Exception {
// the oldNodeLabels is null
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the oldNodeLabels is null or empty.",
() -> interceptor.removeFromClusterNodeLabels(null, null));
// the oldNodeLabels is empty
Set<String> oldNodeLabels = Sets.newHashSet();
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the oldNodeLabels is null or empty.",
() -> interceptor.removeFromClusterNodeLabels(oldNodeLabels, null));
// error oldNodeLabels
Set<String> oldNodeLabels1 = Sets.newHashSet();
oldNodeLabels1.add("A1");
LambdaTestUtils.intercept(YarnRuntimeException.class, "removeFromClusterNodeLabels Error",
() -> interceptor.removeFromClusterNodeLabels(oldNodeLabels1, null));
}
@Test
public void testGetSchedulerConfiguration() throws Exception {
Response response = interceptor.getSchedulerConfiguration(null);
assertNotNull(response);
assertEquals(OK, response.getStatus());
Object entity = response.getEntity();
assertNotNull(entity);
assertTrue(entity instanceof FederationConfInfo);
FederationConfInfo federationConfInfo = FederationConfInfo.class.cast(entity);
List<ConfInfo> confInfos = federationConfInfo.getList();
assertNotNull(confInfos);
assertEquals(4, confInfos.size());
List<String> errors = federationConfInfo.getErrorMsgs();
assertEquals(0, errors.size());
Set<String> subClusterSet = subClusters.stream()
.map(subClusterId -> subClusterId.getId()).collect(Collectors.toSet());
for (ConfInfo confInfo : confInfos) {
List<ConfInfo.ConfItem> confItems = confInfo.getItems();
assertNotNull(confItems);
assertTrue(confItems.size() > 0);
assertTrue(subClusterSet.contains(confInfo.getSubClusterId()));
}
}
@Test
public void testGetClusterUserInfo() {
String requestUserName = "test-user";
HttpServletRequest hsr = mock(HttpServletRequest.class);
when(hsr.getRemoteUser()).thenReturn(requestUserName);
ClusterUserInfo clusterUserInfo = interceptor.getClusterUserInfo(hsr);
assertNotNull(clusterUserInfo);
assertTrue(clusterUserInfo instanceof FederationClusterUserInfo);
FederationClusterUserInfo federationClusterUserInfo =
(FederationClusterUserInfo) clusterUserInfo;
List<ClusterUserInfo> fedClusterUserInfoList = federationClusterUserInfo.getList();
assertNotNull(fedClusterUserInfoList);
assertEquals(4, fedClusterUserInfoList.size());
List<String> subClusterIds = subClusters.stream().map(
subClusterId -> subClusterId.getId()).collect(Collectors.toList());
MockRM mockRM = interceptor.getMockRM();
for (ClusterUserInfo fedClusterUserInfo : fedClusterUserInfoList) {
// Check subClusterId
String subClusterId = fedClusterUserInfo.getSubClusterId();
assertNotNull(subClusterId);
assertTrue(subClusterIds.contains(subClusterId));
// Check requestedUser
String requestedUser = fedClusterUserInfo.getRequestedUser();
assertNotNull(requestedUser);
assertEquals(requestUserName, requestedUser);
// Check rmLoginUser
String rmLoginUser = fedClusterUserInfo.getRmLoginUser();
assertNotNull(rmLoginUser);
assertEquals(mockRM.getRMLoginUser(), rmLoginUser);
}
}
@Test
public void testUpdateSchedulerConfigurationErrorMsg() throws Exception {
SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the subClusterId is empty or null.",
() -> interceptor.updateSchedulerConfiguration(mutationInfo, null));
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the schedConfUpdateInfo is empty or null.",
() -> interceptor.updateSchedulerConfiguration(null, null));
}
@Test
public void testUpdateSchedulerConfiguration()
throws AuthorizationException, InterruptedException {
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
updateInfo.setSubClusterId("1");
Map<String, String> goodUpdateMap = new HashMap<>();
goodUpdateMap.put("goodKey", "goodVal");
QueueConfigInfo goodUpdateInfo = new
QueueConfigInfo("root.default", goodUpdateMap);
updateInfo.getUpdateQueueInfo().add(goodUpdateInfo);
Response response = interceptor.updateSchedulerConfiguration(updateInfo, null);
assertNotNull(response);
assertEquals(OK, response.getStatus());
String expectMsg = "Configuration change successfully applied.";
Object entity = response.getEntity();
assertNotNull(entity);
String entityMsg = String.valueOf(entity);
assertEquals(expectMsg, entityMsg);
}
@Test
public void testGetClusterInfo() {
ClusterInfo clusterInfos = interceptor.getClusterInfo();
assertNotNull(clusterInfos);
assertTrue(clusterInfos instanceof FederationClusterInfo);
FederationClusterInfo federationClusterInfos =
(FederationClusterInfo) (clusterInfos);
List<ClusterInfo> fedClusterInfosList = federationClusterInfos.getList();
assertNotNull(fedClusterInfosList);
assertEquals(4, fedClusterInfosList.size());
List<String> subClusterIds = subClusters.stream().map(
subClusterId -> subClusterId.getId()).collect(Collectors.toList());
MockRM mockRM = interceptor.getMockRM();
String yarnVersion = YarnVersionInfo.getVersion();
for (ClusterInfo clusterInfo : fedClusterInfosList) {
String subClusterId = clusterInfo.getSubClusterId();
// Check subClusterId
assertTrue(subClusterIds.contains(subClusterId));
// Check state
String clusterState = mockRM.getServiceState().toString();
assertEquals(clusterState, clusterInfo.getState());
// Check rmStateStoreName
String rmStateStoreName =
mockRM.getRMContext().getStateStore().getClass().getName();
assertEquals(rmStateStoreName, clusterInfo.getRMStateStore());
// Check RM Version
assertEquals(yarnVersion, clusterInfo.getRMVersion());
// Check haZooKeeperConnectionState
String rmHAZookeeperConnectionState = mockRM.getRMContext().getHAZookeeperConnectionState();
assertEquals(rmHAZookeeperConnectionState,
clusterInfo.getHAZookeeperConnectionState());
}
}
private String getStackTraceAsString(Exception e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return sw.toString();
}
}