TestFederationRMAdminInterceptor.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetSubClustersRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetSubClustersResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.FederationSubCluster;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Extends the FederationRMAdminInterceptor and overrides methods to provide a
* testable implementation of FederationRMAdminInterceptor.
*/
public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationRMAdminInterceptor.class);
////////////////////////////////
// constant information
////////////////////////////////
private final static String USER_NAME = "test-user";
private final static int NUM_SUBCLUSTER = 4;
private final static int GB = 1024;
private TestableFederationRMAdminInterceptor interceptor;
private FederationStateStoreFacade facade;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private List<SubClusterId> subClusters;
@Override
public void setUp() {
super.setUpConfig();
// Initialize facade & stateSore
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
facade = FederationStateStoreFacade.getInstance(this.getConf());
facade.reinitialize(stateStore, this.getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
// Initialize interceptor
interceptor = new TestableFederationRMAdminInterceptor();
interceptor.setConf(this.getConf());
interceptor.init(USER_NAME);
// Storage SubClusters
subClusters = new ArrayList<>();
try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
SubClusterId sc = SubClusterId.newInstance("SC-" + i);
stateStoreUtil.registerSubCluster(sc);
subClusters.add(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
Assert.fail();
}
DefaultMetricsSystem.setMiniClusterMode(true);
}
@Override
protected YarnConfiguration createConfiguration() {
// Set Enable YarnFederation
YarnConfiguration config = new YarnConfiguration();
config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughRMAdminRequestInterceptor.class.getName();
// Create a request interceptor pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
config.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," +
TestFederationRMAdminInterceptor.class.getName());
config.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
return config;
}
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
@Test
public void testRefreshQueues() throws Exception {
// We will test 2 cases:
// case 1, request is null.
// case 2, normal request.
// If the request is null, a Missing RefreshQueues request exception will be thrown.
// null request.
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshQueues request.",
() -> interceptor.refreshQueues(null));
// normal request.
RefreshQueuesRequest request = RefreshQueuesRequest.newInstance();
RefreshQueuesResponse response = interceptor.refreshQueues(request);
assertNotNull(response);
}
@Test
public void testSC1RefreshQueues() throws Exception {
// We will test 2 cases:
// case 1, test the existing subCluster (SC-1).
// case 2, test the non-exist subCluster.
String existSubCluster = "SC-1";
RefreshQueuesRequest request = RefreshQueuesRequest.newInstance(existSubCluster);
interceptor.refreshQueues(request);
String notExistsSubCluster = "SC-NON";
RefreshQueuesRequest request1 = RefreshQueuesRequest.newInstance(notExistsSubCluster);
LambdaTestUtils.intercept(YarnException.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshQueues(request1));
}
@Test
public void testRefreshNodes() throws Exception {
// We will test 2 cases:
// case 1, request is null.
// case 2, normal request.
// If the request is null, a Missing RefreshNodes request exception will be thrown.
// null request.
LambdaTestUtils.intercept(YarnException.class,
"Missing RefreshNodes request.", () -> interceptor.refreshNodes(null));
// normal request.
RefreshNodesRequest request = RefreshNodesRequest.newInstance(DecommissionType.NORMAL);
interceptor.refreshNodes(request);
}
@Test
public void testSC1RefreshNodes() throws Exception {
// We will test 2 cases:
// case 1, test the existing subCluster (SC-1).
// case 2, test the non-exist subCluster.
RefreshNodesRequest request =
RefreshNodesRequest.newInstance(DecommissionType.NORMAL, 10, "SC-1");
interceptor.refreshNodes(request);
String notExistsSubCluster = "SC-NON";
RefreshNodesRequest request1 = RefreshNodesRequest.newInstance(
DecommissionType.NORMAL, 10, notExistsSubCluster);
LambdaTestUtils.intercept(YarnException.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshNodes(request1));
}
@Test
public void testRefreshSuperUserGroupsConfiguration() throws Exception {
// null request.
LambdaTestUtils.intercept(YarnException.class,
"Missing RefreshSuperUserGroupsConfiguration request.",
() -> interceptor.refreshSuperUserGroupsConfiguration(null));
// normal request.
// There is no return information defined in RefreshSuperUserGroupsConfigurationResponse,
// as long as it is not empty, it means that the command is successfully executed.
RefreshSuperUserGroupsConfigurationRequest request =
RefreshSuperUserGroupsConfigurationRequest.newInstance();
RefreshSuperUserGroupsConfigurationResponse response =
interceptor.refreshSuperUserGroupsConfiguration(request);
assertNotNull(response);
}
@Test
public void testSC1RefreshSuperUserGroupsConfiguration() throws Exception {
// case 1, test the existing subCluster (SC-1).
String existSubCluster = "SC-1";
RefreshSuperUserGroupsConfigurationRequest request =
RefreshSuperUserGroupsConfigurationRequest.newInstance(existSubCluster);
RefreshSuperUserGroupsConfigurationResponse response =
interceptor.refreshSuperUserGroupsConfiguration(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
String notExistsSubCluster = "SC-NON";
RefreshSuperUserGroupsConfigurationRequest request1 =
RefreshSuperUserGroupsConfigurationRequest.newInstance(notExistsSubCluster);
LambdaTestUtils.intercept(Exception.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshSuperUserGroupsConfiguration(request1));
}
@Test
public void testRefreshUserToGroupsMappings() throws Exception {
// null request.
LambdaTestUtils.intercept(YarnException.class,
"Missing RefreshUserToGroupsMappings request.",
() -> interceptor.refreshUserToGroupsMappings(null));
// normal request.
RefreshUserToGroupsMappingsRequest request = RefreshUserToGroupsMappingsRequest.newInstance();
RefreshUserToGroupsMappingsResponse response = interceptor.refreshUserToGroupsMappings(request);
assertNotNull(response);
}
@Test
public void testSC1RefreshUserToGroupsMappings() throws Exception {
// case 1, test the existing subCluster (SC-1).
String existSubCluster = "SC-1";
RefreshUserToGroupsMappingsRequest request =
RefreshUserToGroupsMappingsRequest.newInstance(existSubCluster);
RefreshUserToGroupsMappingsResponse response =
interceptor.refreshUserToGroupsMappings(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
String notExistsSubCluster = "SC-NON";
RefreshUserToGroupsMappingsRequest request1 =
RefreshUserToGroupsMappingsRequest.newInstance(notExistsSubCluster);
LambdaTestUtils.intercept(Exception.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshUserToGroupsMappings(request1));
}
@Test
public void testRefreshAdminAcls() throws Exception {
// null request.
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshAdminAcls request.",
() -> interceptor.refreshAdminAcls(null));
// normal request.
RefreshAdminAclsRequest request = RefreshAdminAclsRequest.newInstance();
RefreshAdminAclsResponse response = interceptor.refreshAdminAcls(request);
assertNotNull(response);
}
@Test
public void testSC1RefreshAdminAcls() throws Exception {
// case 1, test the existing subCluster (SC-1).
String existSubCluster = "SC-1";
RefreshAdminAclsRequest request = RefreshAdminAclsRequest.newInstance(existSubCluster);
RefreshAdminAclsResponse response = interceptor.refreshAdminAcls(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
String notExistsSubCluster = "SC-NON";
RefreshAdminAclsRequest request1 = RefreshAdminAclsRequest.newInstance(notExistsSubCluster);
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshAdminAcls(request1));
}
@Test
public void testRefreshServiceAcls() throws Exception {
// null request.
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshServiceAcls request.",
() -> interceptor.refreshServiceAcls(null));
// normal request.
RefreshServiceAclsRequest request = RefreshServiceAclsRequest.newInstance();
RefreshServiceAclsResponse response = interceptor.refreshServiceAcls(request);
assertNotNull(response);
}
@Test
public void testSC1RefreshServiceAcls() throws Exception {
// case 1, test the existing subCluster (SC-1).
String existSubCluster = "SC-1";
RefreshServiceAclsRequest request = RefreshServiceAclsRequest.newInstance(existSubCluster);
RefreshServiceAclsResponse response = interceptor.refreshServiceAcls(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
String notExistsSubCluster = "SC-NON";
RefreshServiceAclsRequest request1 = RefreshServiceAclsRequest.newInstance(notExistsSubCluster);
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshServiceAcls(request1));
}
@Test
public void testUpdateNodeResourceEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing UpdateNodeResource request.",
() -> interceptor.updateNodeResource(null));
// null request2.
Map<NodeId, ResourceOption> nodeResourceMap = new HashMap<>();
UpdateNodeResourceRequest request = UpdateNodeResourceRequest.newInstance(nodeResourceMap);
LambdaTestUtils.intercept(YarnException.class, "Missing UpdateNodeResource SubClusterId.",
() -> interceptor.updateNodeResource(request));
}
@Test
public void testUpdateNodeResourceNormalRequest() throws Exception {
// case 1, test the existing subCluster (SC-1).
Map<NodeId, ResourceOption> nodeResourceMap = new HashMap<>();
NodeId nodeId = NodeId.newInstance("127.0.0.1", 1);
ResourceOption resourceOption =
ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1);
nodeResourceMap.put(nodeId, resourceOption);
UpdateNodeResourceRequest request =
UpdateNodeResourceRequest.newInstance(nodeResourceMap, "SC-1");
UpdateNodeResourceResponse response = interceptor.updateNodeResource(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
UpdateNodeResourceRequest request1 =
UpdateNodeResourceRequest.newInstance(nodeResourceMap, "SC-NON");
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.updateNodeResource(request1));
}
@Test
public void testRefreshNodesResourcesEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshNodesResources request.",
() -> interceptor.refreshNodesResources(null));
// null request2.
RefreshNodesResourcesRequest request = RefreshNodesResourcesRequest.newInstance();
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshNodesResources SubClusterId.",
() -> interceptor.refreshNodesResources(request));
}
@Test
public void testRefreshNodesResourcesNormalRequest() throws Exception {
// case 1, test the existing subCluster (SC-1).
RefreshNodesResourcesRequest request = RefreshNodesResourcesRequest.newInstance("SC-1");
RefreshNodesResourcesResponse response = interceptor.refreshNodesResources(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
RefreshNodesResourcesRequest request1 = RefreshNodesResourcesRequest.newInstance("SC-NON");
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshNodesResources(request1));
}
@Test
public void testAddToClusterNodeLabelsEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing AddToClusterNodeLabels request.",
() -> interceptor.addToClusterNodeLabels(null));
// null request2.
AddToClusterNodeLabelsRequest request = AddToClusterNodeLabelsRequest.newInstance(null, null);
LambdaTestUtils.intercept(YarnException.class, "Missing AddToClusterNodeLabels SubClusterId.",
() -> interceptor.addToClusterNodeLabels(request));
}
@Test
public void testAddToClusterNodeLabelsNormalRequest() throws Exception {
// case1, We add NodeLabel to subCluster SC-1
NodeLabel nodeLabelA = NodeLabel.newInstance("a");
NodeLabel nodeLabelB = NodeLabel.newInstance("b");
List<NodeLabel> labels = new ArrayList<>();
labels.add(nodeLabelA);
labels.add(nodeLabelB);
AddToClusterNodeLabelsRequest request =
AddToClusterNodeLabelsRequest.newInstance("SC-1", labels);
AddToClusterNodeLabelsResponse response = interceptor.addToClusterNodeLabels(request);
assertNotNull(response);
// case2, test the non-exist subCluster.
AddToClusterNodeLabelsRequest request1 =
AddToClusterNodeLabelsRequest.newInstance("SC-NON", labels);
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.addToClusterNodeLabels(request1));
}
@Test
public void testRemoveFromClusterNodeLabelsEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing RemoveFromClusterNodeLabels request.",
() -> interceptor.removeFromClusterNodeLabels(null));
// null request2.
RemoveFromClusterNodeLabelsRequest request =
RemoveFromClusterNodeLabelsRequest.newInstance(null, null);
LambdaTestUtils.intercept(YarnException.class,
"Missing RemoveFromClusterNodeLabels SubClusterId.",
() -> interceptor.removeFromClusterNodeLabels(request));
}
@Test
public void testRemoveFromClusterNodeLabelsNormalRequest() throws Exception {
// case1, We add nodelabel a for SC-1, and then remove nodelabel a
// Step1. Add NodeLabel for subCluster SC-1
NodeLabel nodeLabelA = NodeLabel.newInstance("a");
NodeLabel nodeLabelB = NodeLabel.newInstance("b");
List<NodeLabel> nodeLabels = new ArrayList<>();
nodeLabels.add(nodeLabelA);
nodeLabels.add(nodeLabelB);
AddToClusterNodeLabelsRequest request =
AddToClusterNodeLabelsRequest.newInstance("SC-1", nodeLabels);
interceptor.addToClusterNodeLabels(request);
// Step2. We delete the label a of subCluster SC-1
Set<String> labels = new HashSet<>();
labels.add("a");
RemoveFromClusterNodeLabelsRequest request1 =
RemoveFromClusterNodeLabelsRequest.newInstance("SC-1", labels);
RemoveFromClusterNodeLabelsResponse response =
interceptor.removeFromClusterNodeLabels(request1);
assertNotNull(response);
// case2, test the non-exist subCluster.
RemoveFromClusterNodeLabelsRequest request2 =
RemoveFromClusterNodeLabelsRequest.newInstance("SC-NON", labels);
LambdaTestUtils.intercept(YarnException.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.removeFromClusterNodeLabels(request2));
}
@Test
public void testReplaceLabelsOnNodeEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing ReplaceLabelsOnNode request.",
() -> interceptor.replaceLabelsOnNode(null));
// null request2.
Map<NodeId, Set<String>> labelMap = new HashMap<>();
ReplaceLabelsOnNodeRequest request = ReplaceLabelsOnNodeRequest.newInstance(labelMap, null);
LambdaTestUtils.intercept(YarnException.class, "Missing ReplaceLabelsOnNode SubClusterId.",
() -> interceptor.replaceLabelsOnNode(request));
}
@Test
public void tesReplaceLabelsOnNodeEmptyNormalRequest() throws Exception {
// case1, We add nodelabel for SC-1, and then replace the label for the specific node.
NodeLabel nodeLabelA = NodeLabel.newInstance("a");
NodeLabel nodeLabelB = NodeLabel.newInstance("b");
List<NodeLabel> nodeLabels = new ArrayList<>();
nodeLabels.add(nodeLabelA);
nodeLabels.add(nodeLabelB);
AddToClusterNodeLabelsRequest request =
AddToClusterNodeLabelsRequest.newInstance("SC-1", nodeLabels);
interceptor.addToClusterNodeLabels(request);
Map<NodeId, Set<String>> pMap = new HashMap<>();
NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
Set<String> labels = new HashSet<>();
labels.add("a");
pMap.put(nodeId, labels);
ReplaceLabelsOnNodeRequest request1 = ReplaceLabelsOnNodeRequest.newInstance(pMap, "SC-1");
ReplaceLabelsOnNodeResponse response = interceptor.replaceLabelsOnNode(request1);
assertNotNull(response);
// case2, test the non-exist subCluster.
ReplaceLabelsOnNodeRequest request2 =
ReplaceLabelsOnNodeRequest.newInstance(pMap, "SC-NON");
LambdaTestUtils.intercept(YarnException.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.replaceLabelsOnNode(request2));
}
@Test
public void testCheckForDecommissioningNodesRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing checkForDecommissioningNodes request.",
() -> interceptor.checkForDecommissioningNodes(null));
// null request2.
CheckForDecommissioningNodesRequest request =
CheckForDecommissioningNodesRequest.newInstance(null);
LambdaTestUtils.intercept(YarnException.class,
"Missing checkForDecommissioningNodes SubClusterId.",
() -> interceptor.checkForDecommissioningNodes(request));
}
@Test
public void testCheckForDecommissioningNodesNormalRequest() throws Exception {
CheckForDecommissioningNodesRequest request =
CheckForDecommissioningNodesRequest.newInstance("SC-1");
CheckForDecommissioningNodesResponse response =
interceptor.checkForDecommissioningNodes(request);
assertNotNull(response);
Set<NodeId> nodeIds = response.getDecommissioningNodes();
assertNotNull(nodeIds);
assertEquals(0, nodeIds.size());
}
@Test
public void testMapAttributesToNodesRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing mapAttributesToNodes request.",
() -> interceptor.mapAttributesToNodes(null));
// null request2.
NodeAttribute nodeAttribute = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf");
List<NodeAttribute> nodeAttributeList = Collections.singletonList(nodeAttribute);
NodeToAttributes nodeToAttributes = NodeToAttributes.newInstance("host1", nodeAttributeList);
List<NodeToAttributes> nodeToAttributesList = Collections.singletonList(nodeToAttributes);
NodesToAttributesMappingRequest request = NodesToAttributesMappingRequest.newInstance(
AttributeMappingOperationType.ADD, nodeToAttributesList, true, null);
LambdaTestUtils.intercept(YarnException.class, "Missing mapAttributesToNodes SubClusterId.",
() -> interceptor.mapAttributesToNodes(request));
}
@Test
public void testMapAttributesToNodesNormalRequest() throws Exception {
NodeAttribute nodeAttribute = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf");
List<NodeAttribute> nodeAttributeList = Collections.singletonList(nodeAttribute);
NodeToAttributes nodeToAttributes =
NodeToAttributes.newInstance("127.0.0.1", nodeAttributeList);
List<NodeToAttributes> nodeToAttributesList = Collections.singletonList(nodeToAttributes);
NodesToAttributesMappingRequest request = NodesToAttributesMappingRequest.newInstance(
AttributeMappingOperationType.ADD, nodeToAttributesList, true, "SC-1");
interceptor.mapAttributesToNodes(request);
}
@Test
public void testGetGroupsForUserRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(IOException.class, "Missing getGroupsForUser user.",
() -> interceptor.getGroupsForUser(null));
}
@Test
public void testGetGroupsForUserNormalRequest() throws Exception {
String[] groups = interceptor.getGroupsForUser("admin");
assertNotNull(groups);
assertEquals(1, groups.length);
assertEquals("admin", groups[0]);
}
@Test
public void testSaveFederationQueuePolicyErrorRequest() throws Exception {
// null request.
LambdaTestUtils.intercept(YarnException.class, "Missing SaveFederationQueuePolicy request.",
() -> interceptor.saveFederationQueuePolicy(null));
// federationQueueWeight is null.
LambdaTestUtils.intercept(
IllegalArgumentException.class, "FederationQueueWeight cannot be null.",
() -> SaveFederationQueuePolicyRequest.newInstance("root.a", null, "-"));
// queue is null
FederationQueueWeight federationQueueWeight =
FederationQueueWeight.newInstance("SC-1:0.7,SC-2:0.3", "SC-1:0.7,SC-2:0.3", "1.0");
SaveFederationQueuePolicyRequest request =
SaveFederationQueuePolicyRequest.newInstance("", federationQueueWeight, "-");
LambdaTestUtils.intercept(YarnException.class, "Missing Queue information.",
() -> interceptor.saveFederationQueuePolicy(request));
// PolicyManager needs to support weight
FederationQueueWeight federationQueueWeight2 = FederationQueueWeight.newInstance(
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
SaveFederationQueuePolicyRequest request2 =
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2,
"TestPolicyManager");
LambdaTestUtils.intercept(YarnException.class,
"TestPolicyManager does not support the use of queue weights.",
() -> interceptor.saveFederationQueuePolicy(request2));
// routerWeight / amrmWeight
// The sum of the routerWeight is not equal to 1.
String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
FederationQueueWeight federationQueueWeight3 = FederationQueueWeight.newInstance(
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
SaveFederationQueuePolicyRequest request3 =
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight3,
policyTypeName);
LambdaTestUtils.intercept(YarnException.class,
"The sum of ratios for all subClusters must be equal to 1.",
() -> interceptor.saveFederationQueuePolicy(request3));
}
@Test
public void testSaveFederationQueuePolicyRequest() throws IOException, YarnException {
// We design unit tests, including 2 SubCluster (SC-1, SC-2)
// Router Weight: SC-1=0.7,SC-2=0.3
// AMRM Weight: SC-1=0.6,SC-2=0.4
// headRoomAlpha: 1.0
String queue = "root.a";
String subCluster1 = "SC-1";
String subCluster2 = "SC-2";
String routerWeight = "SC-1:0.7,SC-2:0.3";
String amrmWeight = "SC-1:0.6,SC-2:0.4";
String headRoomAlpha = "1.0";
// Step1. Write FederationQueue information to stateStore.
String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
FederationQueueWeight federationQueueWeight =
FederationQueueWeight.newInstance(routerWeight, amrmWeight, headRoomAlpha);
SaveFederationQueuePolicyRequest request =
SaveFederationQueuePolicyRequest.newInstance(queue, federationQueueWeight, policyTypeName);
SaveFederationQueuePolicyResponse response = interceptor.saveFederationQueuePolicy(request);
assertNotNull(response);
assertEquals("save policy success.", response.getMessage());
// Step2. We query Policy information from FederationStateStore.
FederationStateStoreFacade federationFacade = interceptor.getFederationFacade();
SubClusterPolicyConfiguration policyConfiguration =
federationFacade.getPolicyConfiguration(queue);
assertNotNull(policyConfiguration);
assertEquals(queue, policyConfiguration.getQueue());
ByteBuffer params = policyConfiguration.getParams();
assertNotNull(params);
WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params);
assertNotNull(weightedPolicyInfo);
SubClusterIdInfo sc1 = new SubClusterIdInfo(subCluster1);
SubClusterIdInfo sc2 = new SubClusterIdInfo(subCluster2);
// Step3. We will compare the accuracy of routerPolicyWeights and amrmPolicyWeights.
Map<SubClusterIdInfo, Float> routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights();
Float sc1Weight = routerPolicyWeights.get(sc1);
assertNotNull(sc1Weight);
assertEquals(0.7f, sc1Weight.floatValue(), 0.00001);
Float sc2Weight = routerPolicyWeights.get(sc2);
assertNotNull(sc2Weight);
assertEquals(0.3f, sc2Weight.floatValue(), 0.00001);
Map<SubClusterIdInfo, Float> amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights();
Float sc1AMRMWeight = amrmPolicyWeights.get(sc1);
assertNotNull(sc1AMRMWeight);
assertEquals(0.6f, sc1AMRMWeight.floatValue(), 0.00001);
Float sc2AMRMWeight = amrmPolicyWeights.get(sc2);
assertNotNull(sc2AMRMWeight);
assertEquals(0.4f, sc2AMRMWeight.floatValue(), 0.00001);
}
@Test
public void testBatchSaveFederationQueuePoliciesRequest() throws IOException, YarnException {
// subClusters
List<String> subClusterLists = new ArrayList<>();
subClusterLists.add("SC-1");
subClusterLists.add("SC-2");
// generate queue A, queue B, queue C
FederationQueueWeight rootA = generateFederationQueueWeight("root.a", subClusterLists);
FederationQueueWeight rootB = generateFederationQueueWeight("root.b", subClusterLists);
FederationQueueWeight rootC = generateFederationQueueWeight("root.b", subClusterLists);
List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
federationQueueWeights.add(rootA);
federationQueueWeights.add(rootB);
federationQueueWeights.add(rootC);
// Step1. Save Queue Policies in Batches
BatchSaveFederationQueuePoliciesRequest request =
BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights);
BatchSaveFederationQueuePoliciesResponse policiesResponse =
interceptor.batchSaveFederationQueuePolicies(request);
assertNotNull(policiesResponse);
assertNotNull(policiesResponse.getMessage());
assertEquals("batch save policies success.", policiesResponse.getMessage());
// Step2. We query Policy information from FederationStateStore.
FederationStateStoreFacade federationFacade = interceptor.getFederationFacade();
SubClusterPolicyConfiguration policyConfiguration =
federationFacade.getPolicyConfiguration("root.a");
assertNotNull(policyConfiguration);
assertEquals("root.a", policyConfiguration.getQueue());
ByteBuffer params = policyConfiguration.getParams();
assertNotNull(params);
WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params);
assertNotNull(weightedPolicyInfo);
Map<SubClusterIdInfo, Float> amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights();
Map<SubClusterIdInfo, Float> routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights();
SubClusterIdInfo sc1 = new SubClusterIdInfo("SC-1");
SubClusterIdInfo sc2 = new SubClusterIdInfo("SC-2");
// Check whether the AMRMWeight of SC-1 and SC-2 of root.a meet expectations
FederationQueueWeight queueWeight = federationQueueWeights.get(0);
Map<SubClusterIdInfo, Float> subClusterAmrmWeightMap =
interceptor.getSubClusterWeightMap(queueWeight.getAmrmWeight());
Float sc1ExpectedAmrmWeightFloat = amrmPolicyWeights.get(sc1);
Float sc1AmrmWeightFloat = subClusterAmrmWeightMap.get(sc1);
assertNotNull(sc1AmrmWeightFloat);
assertEquals(sc1ExpectedAmrmWeightFloat, sc1AmrmWeightFloat, 0.00001);
Float sc2ExpectedAmrmWeightFloat = amrmPolicyWeights.get(sc2);
Float sc2AmrmWeightFloat = subClusterAmrmWeightMap.get(sc2);
assertNotNull(sc2ExpectedAmrmWeightFloat);
assertEquals(sc2ExpectedAmrmWeightFloat, sc2AmrmWeightFloat, 0.00001);
// Check whether the RouterPolicyWeight of SC-1 and SC-2 of root.a meet expectations
Map<SubClusterIdInfo, Float> subClusterRouterWeightMap =
interceptor.getSubClusterWeightMap(queueWeight.getRouterWeight());
Float sc1ExpectedRouterWeightFloat = routerPolicyWeights.get(sc1);
Float sc1RouterWeightFloat = subClusterRouterWeightMap.get(sc1);
assertNotNull(sc1RouterWeightFloat);
assertEquals(sc1ExpectedRouterWeightFloat, sc1RouterWeightFloat, 0.00001);
Float sc2ExpectedRouterWeightFloat = routerPolicyWeights.get(sc2);
Float sc2RouterWeightFloat = subClusterRouterWeightMap.get(sc2);
assertNotNull(sc2ExpectedRouterWeightFloat);
assertEquals(sc2ExpectedRouterWeightFloat, sc2RouterWeightFloat, 0.00001);
}
/**
* Generate FederationQueueWeight.
* We will generate the weight information of the queue.
*
* @param queue queue name
* @param pSubClusters subClusters
* @return subCluster FederationQueueWeight
*/
private FederationQueueWeight generateFederationQueueWeight(
String queue, List<String> pSubClusters) {
String routerWeight = generatePolicyWeight(pSubClusters);
String amrmWeight = generatePolicyWeight(pSubClusters);
String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
String headRoomAlpha = "1.0";
return FederationQueueWeight.newInstance(routerWeight, amrmWeight, headRoomAlpha,
queue, policyTypeName);
}
/**
* Generating Policy Weight Data.
*
* @param pSubClusters set of sub-clusters.
* @return policy Weight String, like SC-1:0.7,SC-2:0.3
*/
private String generatePolicyWeight(List<String> pSubClusters) {
List<String> weights = generateWeights(pSubClusters.size());
List<String> subClusterWeight = new ArrayList<>();
for (int i = 0; i < pSubClusters.size(); i++) {
String subCluster = pSubClusters.get(i);
String weight = weights.get(i);
subClusterWeight.add(subCluster + ":" + weight);
}
return StringUtils.join(subClusterWeight, ",");
}
/**
* Generate a set of random numbers, and the sum of the numbers is 1.
*
* @param n number of random numbers generated.
* @return a set of random numbers
*/
private List<String> generateWeights(int n) {
List<Float> randomNumbers = new ArrayList<>();
float total = 0.0f;
Random random = new Random();
for (int i = 0; i < n - 1; i++) {
float randNum = random.nextFloat();
randomNumbers.add(randNum);
total += randNum;
}
float lastNumber = 1 - total;
randomNumbers.add(lastNumber);
DecimalFormat decimalFormat = new DecimalFormat("#.##");
List<String> formattedRandomNumbers = new ArrayList<>();
for (double number : randomNumbers) {
String formattedNumber = decimalFormat.format(number);
formattedRandomNumbers.add(formattedNumber);
}
return formattedRandomNumbers;
}
@Test
public void testParsePolicyWeights() {
Map<SubClusterIdInfo, Float> policyWeights = new LinkedHashMap<>();
SubClusterIdInfo sc1 = new SubClusterIdInfo("SC-1");
policyWeights.put(sc1, 0.7f);
SubClusterIdInfo sc2 = new SubClusterIdInfo("SC-2");
policyWeights.put(sc2, 0.3f);
String policyWeight = interceptor.parsePolicyWeights(policyWeights);
assertEquals("SC-1:0.7,SC-2:0.3", policyWeight);
}
@Test
public void testFilterPoliciesConfigurationsByQueues() throws Exception {
// SubClusters : SC-1,SC-2
List<String> subClusterLists = new ArrayList<>();
subClusterLists.add("SC-1");
subClusterLists.add("SC-2");
// We initialize 26 queues, queue root.a~root.z
List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
for (char letter = 'a'; letter <= 'z'; letter++) {
FederationQueueWeight leaf =
generateFederationQueueWeight("root." + letter, subClusterLists);
federationQueueWeights.add(leaf);
}
// Save Queue Policies in Batches
BatchSaveFederationQueuePoliciesRequest request =
BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights);
BatchSaveFederationQueuePoliciesResponse policiesResponse =
interceptor.batchSaveFederationQueuePolicies(request);
assertNotNull(policiesResponse);
assertNotNull(policiesResponse.getMessage());
assertEquals("batch save policies success.", policiesResponse.getMessage());
// We query 12 queues, root.a ~ root.l
List<String> queues = new ArrayList<>();
for (char letter = 'a'; letter <= 'l'; letter++) {
queues.add("root." + letter);
}
// Queue1: We query page 1, 10 items per page, and the returned result should be 10 items.
// TotalPage should be 2, TotalSize should be 12.
QueryFederationQueuePoliciesRequest request1 =
QueryFederationQueuePoliciesRequest.newInstance(10, 1, "", queues);
QueryFederationQueuePoliciesResponse response1 =
interceptor.listFederationQueuePolicies(request1);
assertNotNull(response1);
assertEquals(1, response1.getCurrentPage());
assertEquals(10, response1.getPageSize());
assertEquals(2, response1.getTotalPage());
assertEquals(12, response1.getTotalSize());
List<FederationQueueWeight> federationQueueWeights1 = response1.getFederationQueueWeights();
assertNotNull(federationQueueWeights1);
assertEquals(10, federationQueueWeights1.size());
// Queue2: We query page 1, 12 items per page, and the returned result should be 12 items.
// TotalPage should be 1, TotalSize should be 12.
QueryFederationQueuePoliciesRequest request2 =
QueryFederationQueuePoliciesRequest.newInstance(12, 1, "", queues);
QueryFederationQueuePoliciesResponse response2 =
interceptor.listFederationQueuePolicies(request2);
assertNotNull(response2);
assertEquals(1, response2.getCurrentPage());
assertEquals(12, response2.getPageSize());
assertEquals(1, response2.getTotalPage());
assertEquals(12, response2.getTotalSize());
List<FederationQueueWeight> federationQueueWeights2 = response2.getFederationQueueWeights();
assertNotNull(federationQueueWeights2);
assertEquals(12, federationQueueWeights2.size());
// Queue3: Boundary limit exceeded
// We filter 12 queues, should return 12 records, 12 per page,
// should return 1 page, but we are going to return page 2.
QueryFederationQueuePoliciesRequest request3 =
QueryFederationQueuePoliciesRequest.newInstance(12, 2, "", queues);
QueryFederationQueuePoliciesResponse response3 =
interceptor.listFederationQueuePolicies(request3);
assertNotNull(response3);
assertEquals(2, response3.getCurrentPage());
assertEquals(12, response3.getPageSize());
assertEquals(1, response2.getTotalPage());
assertEquals(12, response3.getTotalSize());
List<FederationQueueWeight> federationQueueWeights3 = response3.getFederationQueueWeights();
assertNotNull(federationQueueWeights3);
assertEquals(0, federationQueueWeights3.size());
// Queue4: Boundary limit exceeded
// We pass in some negative parameters and we will get some exceptions
QueryFederationQueuePoliciesRequest request4 =
QueryFederationQueuePoliciesRequest.newInstance(-1, 2, "", queues);
LambdaTestUtils.intercept(YarnException.class, "PageSize cannot be negative or zero.",
() -> interceptor.listFederationQueuePolicies(request4));
// Queue5: Boundary limit exceeded
// We pass in some negative parameters and we will get some exceptions
QueryFederationQueuePoliciesRequest request5 =
QueryFederationQueuePoliciesRequest.newInstance(10, -1, "", queues);
LambdaTestUtils.intercept(YarnException.class, "CurrentPage cannot be negative or zero.",
() -> interceptor.listFederationQueuePolicies(request5));
// Queue6: We use Queue as the condition,
// at this time we will only get the only one return value.
QueryFederationQueuePoliciesRequest request6 =
QueryFederationQueuePoliciesRequest.newInstance(10, 1, "root.a", null);
QueryFederationQueuePoliciesResponse response6 =
interceptor.listFederationQueuePolicies(request6);
assertNotNull(response6);
assertEquals(1, response6.getCurrentPage());
assertEquals(10, response6.getPageSize());
assertEquals(1, response6.getTotalPage());
assertEquals(1, response6.getTotalSize());
List<FederationQueueWeight> federationQueueWeights6 = response6.getFederationQueueWeights();
assertNotNull(federationQueueWeights6);
assertEquals(1, federationQueueWeights6.size());
// Queue7: We design such a test case, we do not set any filter conditions,
// but we need to get the return results
QueryFederationQueuePoliciesRequest request7 =
QueryFederationQueuePoliciesRequest.newInstance(10, 1, null, null);
QueryFederationQueuePoliciesResponse response7 =
interceptor.listFederationQueuePolicies(request7);
assertNotNull(response7);
assertEquals(1, response7.getCurrentPage());
assertEquals(10, response7.getPageSize());
assertEquals(3, response7.getTotalPage());
assertEquals(26, response7.getTotalSize());
List<FederationQueueWeight> federationQueueWeights7 = response7.getFederationQueueWeights();
assertNotNull(federationQueueWeights7);
assertEquals(10, federationQueueWeights7.size());
// Queue8: We are designing a unit test where the number of records
// we need to retrieve exceeds the maximum number of Policies available.
QueryFederationQueuePoliciesRequest request8 =
QueryFederationQueuePoliciesRequest.newInstance(10, 10, null, null);
LambdaTestUtils.intercept(YarnException.class,
"The index of the records to be retrieved has exceeded the maximum index.",
() -> interceptor.listFederationQueuePolicies(request8));
}
@Test
public void testDeleteFederationApplication() throws Exception {
ApplicationId applicationId = ApplicationId.newInstance(10, 1);
DeleteFederationApplicationRequest request1 =
DeleteFederationApplicationRequest.newInstance(applicationId.toString());
LambdaTestUtils.intercept(YarnException.class,
"Application application_10_0001 does not exist.",
() -> interceptor.deleteFederationApplication(request1));
ApplicationId applicationId2 = ApplicationId.newInstance(10, 2);
SubClusterId homeSubCluster = SubClusterId.newInstance("SC-1");
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId2, homeSubCluster);
facade.addApplicationHomeSubCluster(appHomeSubCluster);
DeleteFederationApplicationRequest request2 =
DeleteFederationApplicationRequest.newInstance(applicationId2.toString());
DeleteFederationApplicationResponse deleteFederationApplicationResponse =
interceptor.deleteFederationApplication(request2);
assertNotNull(deleteFederationApplicationResponse);
assertEquals("applicationId = " + applicationId2 + " delete success.",
deleteFederationApplicationResponse.getMessage());
}
@Test
public void testGetFederationSubClusters() throws Exception {
LambdaTestUtils.intercept(YarnException.class,
"Missing getFederationSubClusters Request.",
() -> interceptor.getFederationSubClusters(null));
GetSubClustersRequest request = GetSubClustersRequest.newInstance();
GetSubClustersResponse federationSubClusters = interceptor.getFederationSubClusters(request);
assertNotNull(federationSubClusters);
List<FederationSubCluster> federationSubClustersList =
federationSubClusters.getFederationSubClusters();
assertNotNull(federationSubClustersList);
assertEquals(4, federationSubClustersList.size());
}
@Test
public void testDeleteFederationPoliciesByQueues() throws IOException, YarnException {
// subClusters
List<String> subClusterLists = new ArrayList<>();
subClusterLists.add("SC-1");
subClusterLists.add("SC-2");
// generate queue A, queue B, queue C
FederationQueueWeight rootA = generateFederationQueueWeight("root.a", subClusterLists);
FederationQueueWeight rootB = generateFederationQueueWeight("root.b", subClusterLists);
FederationQueueWeight rootC = generateFederationQueueWeight("root.c", subClusterLists);
List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
federationQueueWeights.add(rootA);
federationQueueWeights.add(rootB);
federationQueueWeights.add(rootC);
// Step1. Save Queue Policies in Batches
BatchSaveFederationQueuePoliciesRequest request =
BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights);
BatchSaveFederationQueuePoliciesResponse policiesResponse =
interceptor.batchSaveFederationQueuePolicies(request);
assertNotNull(policiesResponse);
assertNotNull(policiesResponse.getMessage());
assertEquals("batch save policies success.", policiesResponse.getMessage());
// Step2. Delete the policy of root.c
List<String> deleteQueues = new ArrayList<>();
deleteQueues.add("root.c");
DeleteFederationQueuePoliciesRequest deleteRequest =
DeleteFederationQueuePoliciesRequest.newInstance(deleteQueues);
DeleteFederationQueuePoliciesResponse deleteResponse =
interceptor.deleteFederationPoliciesByQueues(deleteRequest);
assertNotNull(deleteResponse);
String message = deleteResponse.getMessage();
assertEquals("queues = root.c delete success.", message);
}
}