TestRMWebServicesForCSWithPartitions.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_ALLOCATION_STATE;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_ALLOCATIONS;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_DIAGNOSTIC;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_NAME;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.getFirstSubNodeFromJson;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocations;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.TestWebServiceUtil.createRM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.XMLUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.TestProperties;
@RunWith(Parameterized.class)
public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
private static final String DEFAULT_PARTITION = "";
private static final String CAPACITIES = "capacities";
private static final String RESOURCE_USAGES_BY_PARTITION =
"resourceUsagesByPartition";
private static final String QUEUE_CAPACITIES_BY_PARTITION =
"queueCapacitiesByPartition";
private static final String QUEUE_C = "Qc";
private static final String LEAF_QUEUE_C1 = "Qc1";
private static final String LEAF_QUEUE_C2 = "Qc2";
private static final String QUEUE_B = "Qb";
private static final String QUEUE_A = "Qa";
private static final String LABEL_LY = "Ly";
private static final String LABEL_LX = "Lx";
private static final QueuePath ROOT_QUEUE_PATH =
new QueuePath(CapacitySchedulerConfiguration.ROOT);
private static final ImmutableSet<String> CLUSTER_LABELS =
ImmutableSet.of(LABEL_LX, LABEL_LY, DEFAULT_PARTITION);
private static final String DOT = ".";
private static final double EPSILON = 1e-1f;
private MockRM rm;
private CapacitySchedulerConfiguration csConf;
private YarnConfiguration conf;
private final boolean legacyQueueMode;
@Parameterized.Parameters(name = "{index}: legacy-queue-mode={0}")
public static Collection<Boolean> getParameters() {
return Arrays.asList(true, false);
}
private MockNM nm1;
@Override
protected Application configure() {
ResourceConfig config = new ResourceConfig();
config.register(RMWebServices.class);
config.register(new JerseyBinder());
config.register(GenericExceptionHandler.class);
config.register(new JettisonFeature()).register(JAXBContextResolver.class);
forceSet(TestProperties.CONTAINER_PORT, JERSEY_RANDOM_PORT);
return config;
}
private class JerseyBinder extends AbstractBinder {
@Override
protected void configure() {
try {
csConf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
conf = new YarnConfiguration(csConf);
rm = createRM(conf);
Set<NodeLabel> labels = new HashSet<NodeLabel>();
labels.add(NodeLabel.newInstance(LABEL_LX));
labels.add(NodeLabel.newInstance(LABEL_LY));
try {
RMNodeLabelsManager nodeLabelManager =
rm.getRMContext().getNodeLabelManager();
nodeLabelManager.addToCluserNodeLabels(labels);
} catch (Exception e) {
Assert.fail();
}
rm.start();
rm.getRMContext().getNodeLabelManager().addLabelsToNode(ImmutableMap
.of(NodeId.newInstance("127.0.0.1", 0), Sets.newHashSet(LABEL_LX)));
nm1 = new MockNM("127.0.0.1:1234", 2 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 2 * 1024, rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
rm.getRMContext().getNodeLabelManager().addLabelsToNode(ImmutableMap
.of(NodeId.newInstance("127.0.0.2", 0), Sets.newHashSet(LABEL_LY)));
MockNM nm3 = new MockNM("127.0.0.2:1234", 128 * 1024, rm.getResourceTrackerService());
nm3.registerNode();
// Default partition
MockNM nm4 = new MockNM("127.0.0.3:1234", 128 * 1024, rm.getResourceTrackerService());
nm4.registerNode();
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getScheme()).thenReturn("http");
final HttpServletResponse response = mock(HttpServletResponse.class);
bind(rm).to(ResourceManager.class).named("rm");
bind(conf).to(Configuration.class).named("conf");
bind(request).to(HttpServletRequest.class);
bind(response).to(HttpServletResponse.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void setupQueueConfiguration(
CapacitySchedulerConfiguration config) {
config.setLegacyQueueModeEnabled(legacyQueueMode);
// Define top-level queues
config.setQueues(ROOT_QUEUE_PATH,
new String[] { QUEUE_A, QUEUE_B, QUEUE_C });
String interMediateQueueC =
CapacitySchedulerConfiguration.ROOT + "." + QUEUE_C;
QueuePath interMediateQueueCPath = new QueuePath(interMediateQueueC);
config.setQueues(interMediateQueueCPath,
new String[] { LEAF_QUEUE_C1, LEAF_QUEUE_C2 });
config.setCapacityByLabel(ROOT_QUEUE_PATH, LABEL_LX, 100);
config.setCapacityByLabel(ROOT_QUEUE_PATH, LABEL_LY, 100);
String leafQueueA = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_A;
QueuePath leafQueueAPath = new QueuePath(leafQueueA);
config.setCapacity(leafQueueAPath, 30);
config.setMaximumCapacity(leafQueueAPath, 50);
String leafQueueB = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_B;
QueuePath leafQueueBPath = new QueuePath(leafQueueB);
config.setCapacity(leafQueueBPath, 30);
config.setMaximumCapacity(leafQueueBPath, 50);
config.setCapacity(interMediateQueueCPath, 40);
config.setMaximumCapacity(interMediateQueueCPath, 50);
String leafQueueC1 = interMediateQueueC + "." + LEAF_QUEUE_C1;
QueuePath leafQueueC1Path = new QueuePath(leafQueueC1);
config.setCapacity(leafQueueC1Path, 50);
config.setMaximumCapacity(leafQueueC1Path, 60);
String leafQueueC2 = interMediateQueueC + "." + LEAF_QUEUE_C2;
QueuePath leafQueueC2Path = new QueuePath(leafQueueC2);
config.setCapacity(leafQueueC2Path, 50);
config.setMaximumCapacity(leafQueueC2Path, 70);
// Define label specific configuration
config.setAccessibleNodeLabels(
leafQueueAPath, ImmutableSet.of(DEFAULT_PARTITION));
config.setAccessibleNodeLabels(leafQueueBPath, ImmutableSet.of(LABEL_LX));
config.setAccessibleNodeLabels(interMediateQueueCPath,
ImmutableSet.of(LABEL_LX, LABEL_LY));
config.setAccessibleNodeLabels(leafQueueC1Path,
ImmutableSet.of(LABEL_LX, LABEL_LY));
config.setAccessibleNodeLabels(leafQueueC2Path,
ImmutableSet.of(LABEL_LX, LABEL_LY));
config.setDefaultNodeLabelExpression(leafQueueBPath, LABEL_LX);
config.setDefaultNodeLabelExpression(leafQueueC1Path, LABEL_LX);
config.setDefaultNodeLabelExpression(leafQueueC2Path, LABEL_LY);
config.setCapacityByLabel(leafQueueBPath, LABEL_LX, 30);
config.setCapacityByLabel(interMediateQueueCPath, LABEL_LX, 70);
config.setCapacityByLabel(leafQueueC1Path, LABEL_LX, 40);
config.setCapacityByLabel(leafQueueC2Path, LABEL_LX, 60);
config.setCapacityByLabel(interMediateQueueCPath, LABEL_LY, 100);
config.setCapacityByLabel(leafQueueC1Path, LABEL_LY, 50);
config.setCapacityByLabel(leafQueueC2Path, LABEL_LY, 50);
config.setMaximumCapacityByLabel(leafQueueC1Path, LABEL_LY, 75);
config.setMaximumCapacityByLabel(leafQueueC2Path, LABEL_LY, 75);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
@After
public void tearDown() {
if (rm != null) {
rm.stop();
}
}
public TestRMWebServicesForCSWithPartitions(boolean legacyQueueMode) {
this.legacyQueueMode = legacyQueueMode;
}
@Test
public void testSchedulerPartitions() throws JSONException, Exception {
WebTarget r = targetWithJsonObject();
Response response =
r.path("ws").path("v1").path("cluster").path("scheduler")
.request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
verifySchedulerInfoJson(json);
}
@Test
public void testSchedulerPartitionsSlash() throws JSONException, Exception {
WebTarget r = targetWithJsonObject();
Response response =
r.path("ws").path("v1").path("cluster").path("scheduler/")
.request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
verifySchedulerInfoJson(json);
}
@Test
public void testSchedulerPartitionsDefault() throws JSONException, Exception {
WebTarget r = targetWithJsonObject();
Response response = r.path("ws").path("v1").path("cluster")
.path("scheduler").request().get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
verifySchedulerInfoJson(json);
}
@Test
public void testSchedulerPartitionsXML() throws JSONException, Exception {
WebTarget r = target();
Response response =
r.path("ws").path("v1").path("cluster").path("scheduler")
.request(MediaType.APPLICATION_XML).get(Response.class);
assertEquals(MediaType.APPLICATION_XML_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
String xml = response.readEntity(String.class);
DocumentBuilderFactory dbf = XMLUtils.newSecureDocumentBuilderFactory();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
verifySchedulerInfoXML(dom);
}
@Test
public void testPartitionInSchedulerActivities() throws Exception {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue(QUEUE_B)
.withAmLabel(LABEL_LX)
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.resourceName("*").nodeLabelExpression(LABEL_LX)
.capability(Resources.createResource(2048)).numContainers(1)
.build()), null);
WebTarget sr = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(RMWSConsts.SCHEDULER_ACTIVITIES);
ActivitiesTestUtils.requestWebResource(sr, null);
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
JSONObject schedulerActivitiesJson =
ActivitiesTestUtils.requestWebResource(sr, null);
/*
* verify scheduler activities
*/
verifyNumberOfAllocations(schedulerActivitiesJson, 1);
// verify queue Qb
Predicate<JSONObject> findQueueBPred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME)
.equals(CapacitySchedulerConfiguration.ROOT + DOT + QUEUE_B);
List<JSONObject> queueBObj = ActivitiesTestUtils.findInAllocations(
getFirstSubNodeFromJson(schedulerActivitiesJson,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS), findQueueBPred);
assertEquals(1, queueBObj.size());
assertEquals(ActivityState.REJECTED.name(),
queueBObj.get(0).optString(FN_ACT_ALLOCATION_STATE));
assertEquals(ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
+ " from " + am1.getApplicationAttemptId().getApplicationId(),
queueBObj.get(0).optString(FN_ACT_DIAGNOSTIC));
// verify queue Qa
Predicate<JSONObject> findQueueAPred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME)
.equals(CapacitySchedulerConfiguration.ROOT + DOT + QUEUE_A);
List<JSONObject> queueAObj = ActivitiesTestUtils.findInAllocations(
getFirstSubNodeFromJson(schedulerActivitiesJson,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS), findQueueAPred);
assertEquals(1, queueAObj.size());
assertEquals(ActivityState.REJECTED.name(),
queueAObj.get(0).optString(FN_ACT_ALLOCATION_STATE));
assertEquals(
ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION,
queueAObj.get(0).optString(FN_ACT_DIAGNOSTIC));
// verify queue Qc
Predicate<JSONObject> findQueueCPred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME)
.equals(CapacitySchedulerConfiguration.ROOT + DOT + QUEUE_C);
List<JSONObject> queueCObj = ActivitiesTestUtils.findInAllocations(
getFirstSubNodeFromJson(schedulerActivitiesJson,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS), findQueueCPred);
assertEquals(1, queueCObj.size());
assertEquals(ActivityState.SKIPPED.name(),
queueCObj.get(0).optString(FN_ACT_ALLOCATION_STATE));
assertEquals(ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE,
queueCObj.get(0).optString(FN_ACT_DIAGNOSTIC));
}
private void verifySchedulerInfoXML(Document dom) throws Exception {
NodeList scheduler = dom.getElementsByTagName("scheduler");
assertEquals("incorrect number of elements", 1, scheduler.getLength());
NodeList schedulerInfo = dom.getElementsByTagName("schedulerInfo");
assertEquals("incorrect number of elements", 1, schedulerInfo.getLength());
for (int i = 0; i < schedulerInfo.getLength(); i++) {
Element element = (Element) schedulerInfo.item(i);
NodeList children = element.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element schedulerInfoElem = (Element) children.item(j);
if (schedulerInfoElem.getTagName().equals("queues")) {
NodeList qListInfos = schedulerInfoElem.getChildNodes();
for (int k = 0; k < qListInfos.getLength(); k++) {
Element qElem2 = (Element) qListInfos.item(k);
String queue =
WebServicesTestUtils.getXmlString(qElem2, "queueName");
switch (queue) {
case QUEUE_A:
verifyQueueAInfoXML(qElem2);
break;
case QUEUE_B:
verifyQueueBInfoXML(qElem2);
break;
case QUEUE_C:
verifyQueueCInfoXML(qElem2);
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
} else if (schedulerInfoElem.getTagName().equals(CAPACITIES)) {
NodeList capacitiesListInfos = schedulerInfoElem.getChildNodes();
assertEquals("incorrect number of partitions", 3,
capacitiesListInfos.getLength());
for (int k = 0; k < capacitiesListInfos.getLength(); k++) {
Element partitionCapacitiesInfo =
(Element) capacitiesListInfos.item(k);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
assertTrue("invalid PartitionCapacityInfo",
CLUSTER_LABELS.contains(partitionName));
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 100, 0, 100,
100, 0, 100);
}
}
}
}
}
private void verifyQueueAInfoXML(Element queueElem) {
NodeList children = queueElem.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element queueChildElem = (Element) children.item(j);
if (queueChildElem.getTagName().equals(CAPACITIES)) {
NodeList capacitiesListInfos = queueChildElem.getChildNodes();
assertEquals("incorrect number of partitions", 1,
capacitiesListInfos.getLength());
Element partitionCapacitiesInfo = (Element) capacitiesListInfos.item(0);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
assertTrue("invalid PartitionCapacityInfo",
partitionName.isEmpty());
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 30, 0, 50, 30,
0, 50);
} else if (queueChildElem.getTagName().equals("resources")) {
verifyResourceUsageInfoXML(queueChildElem);
}
}
}
private void verifyQueueBInfoXML(Element queueElem) {
assertEquals("Invalid default Label expression", LABEL_LX,
WebServicesTestUtils.getXmlString(queueElem,
"defaultNodeLabelExpression"));
NodeList children = queueElem.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element queueChildElem = (Element) children.item(j);
if (queueChildElem.getTagName().equals(CAPACITIES)) {
NodeList capacitiesListInfos = queueChildElem.getChildNodes();
assertEquals("incorrect number of partitions", 2,
capacitiesListInfos.getLength());
for (int k = 0; k < capacitiesListInfos.getLength(); k++) {
Element partitionCapacitiesInfo =
(Element) capacitiesListInfos.item(k);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 30, 0, 100,
30, 0, 100);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 30, 0, 50,
30, 0, 50);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
}
}
assertEquals("Node Labels are not matching", LABEL_LX,
WebServicesTestUtils.getXmlString(queueElem, "nodeLabels"));
}
private void verifyQueueCInfoXML(Element queueElem) {
NodeList children = queueElem.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element queueChildElem = (Element) children.item(j);
if (queueChildElem.getTagName().equals(CAPACITIES)) {
verifyQcCapacitiesInfoXML(queueChildElem, 70, 100, 70, 100, 100, 100,
100, 100, 40, 50, 40, 50);
} else if (queueChildElem.getTagName().equals("resources")) {
verifyResourceUsageInfoXML(queueChildElem);
} else if (queueChildElem.getTagName().equals("queues")) {
NodeList qListInfos = queueChildElem.getChildNodes();
for (int k = 0; k < qListInfos.getLength(); k++) {
Element qElem2 = (Element) qListInfos.item(k);
String queue = WebServicesTestUtils.getXmlString(qElem2, "queueName");
switch (queue) {
case LEAF_QUEUE_C1:
assertEquals("Invalid default Label expression", LABEL_LX,
WebServicesTestUtils.getXmlString(qElem2,
"defaultNodeLabelExpression"));
NodeList queuec1Children = qElem2.getChildNodes();
for (int l = 0; l < queuec1Children.getLength(); l++) {
Element queueC1ChildElem = (Element) queuec1Children.item(l);
if (queueC1ChildElem.getTagName().equals(CAPACITIES)) {
verifyQcCapacitiesInfoXML(queueC1ChildElem, 40, 100, 28, 100,
50, 75, 50, 75, 50, 60, 20, 30);
}
}
break;
case LEAF_QUEUE_C2:
assertEquals("Invalid default Label expression", LABEL_LY,
WebServicesTestUtils.getXmlString(qElem2,
"defaultNodeLabelExpression"));
NodeList queuec2Children = qElem2.getChildNodes();
for (int l = 0; l < queuec2Children.getLength(); l++) {
Element queueC2ChildElem = (Element) queuec2Children.item(l);
if (queueC2ChildElem.getTagName().equals(CAPACITIES)) {
verifyQcCapacitiesInfoXML(queueC2ChildElem, 60, 100, 42, 100,
50, 75, 50, 75, 50, 70, 20, 35);
}
}
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
}
}
}
private void verifyQcCapacitiesInfoXML(Element partitionCapacitiesElem,
float lxCaps, float lxMaxCaps, float lxAbsCaps, float lxAbsMaxCaps,
float lyCaps, float lyMaxCaps, float lyAbsCaps, float lyAbsMaxCaps,
float defCaps, float defMaxCaps, float defAbsCaps, float defAbsMaxCaps) {
NodeList capacitiesListInfos = partitionCapacitiesElem.getChildNodes();
assertEquals("incorrect number of partitions", 3,
capacitiesListInfos.getLength());
for (int k = 0; k < capacitiesListInfos.getLength(); k++) {
Element partitionCapacitiesInfo = (Element) capacitiesListInfos.item(k);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, lxCaps, 0,
lxMaxCaps, lxAbsCaps, 0, lxAbsMaxCaps);
break;
case LABEL_LY:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, lyCaps, 0,
lyMaxCaps, lyAbsCaps, 0, lyAbsMaxCaps);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, defCaps, 0,
defMaxCaps, defAbsCaps, 0, defAbsMaxCaps);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
}
private void verifyResourceUsageInfoXML(Element queueChildElem) {
NodeList resourceUsageInfo = queueChildElem.getChildNodes();
assertEquals("incorrect number of partitions", 1,
resourceUsageInfo.getLength());
Element partitionResourceUsageInfo = (Element) resourceUsageInfo.item(0);
String partitionName = WebServicesTestUtils
.getXmlString(partitionResourceUsageInfo, "partitionName");
assertTrue("invalid PartitionCapacityInfo",
DEFAULT_PARTITION.equals(partitionName));
}
private void verifyPartitionCapacityInfoXML(Element partitionInfo,
float capacity, float usedCapacity, float maxCapacity,
float absoluteCapacity, float absoluteUsedCapacity,
float absoluteMaxCapacity) {
assertEquals("capacity doesn't match", capacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "capacity"), EPSILON);
assertEquals("capacity doesn't match", usedCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "usedCapacity"), EPSILON);
assertEquals("capacity doesn't match", maxCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "maxCapacity"), EPSILON);
assertEquals("capacity doesn't match", absoluteCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "absoluteCapacity"),
EPSILON);
assertEquals("capacity doesn't match", absoluteUsedCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "absoluteUsedCapacity"),
EPSILON);
assertEquals("capacity doesn't match", absoluteMaxCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "absoluteMaxCapacity"),
EPSILON);
}
private void verifySchedulerInfoJson(JSONObject json)
throws JSONException, Exception {
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 25, info.length());
JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);
assertEquals("incorrect number of elements", CLUSTER_LABELS.size(),
partitionsCapsArray.length());
for (int i = 0; i < partitionsCapsArray.length(); i++) {
JSONObject partitionInfo = partitionsCapsArray.getJSONObject(i);
String partitionName = partitionInfo.getString("partitionName");
assertTrue("Unknown partition received",
CLUSTER_LABELS.contains(partitionName));
verifyPartitionCapacityInfoJson(partitionInfo, 100, 0, 100, 100, 0, 100);
}
JSONObject jsonQueuesObject = info.getJSONObject("queues");
JSONArray queuesArray = jsonQueuesObject.getJSONArray("queue");
for (int i = 0; i < queuesArray.length(); i++) {
JSONObject queueJson = queuesArray.getJSONObject(i);
String queue = queueJson.getString("queueName");
Object resourceUsagesByPartition = queueJson.getJSONObject("resources").
get(RESOURCE_USAGES_BY_PARTITION);
JSONArray resourceUsageByPartition = new JSONArray();
if (resourceUsagesByPartition instanceof JSONArray) {
resourceUsageByPartition = JSONArray.class.cast(resourceUsagesByPartition);
} else {
resourceUsageByPartition.put(JSONObject.class.cast(resourceUsagesByPartition));
}
JSONObject resourcesJsonObject = queueJson.getJSONObject("resources");
Object resourceUsagesByPartition2 = resourcesJsonObject.
get(RESOURCE_USAGES_BY_PARTITION);
JSONArray partitionsResourcesArray = new JSONArray();
if (resourceUsagesByPartition2 instanceof JSONArray) {
partitionsResourcesArray = JSONArray.class.cast(resourceUsagesByPartition2);
} else {
partitionsResourcesArray.put(resourceUsagesByPartition2);
}
capacitiesJsonObject = queueJson.getJSONObject(CAPACITIES);
Object queueCapacitiesByPartition = capacitiesJsonObject.
get(QUEUE_CAPACITIES_BY_PARTITION);
partitionsCapsArray = new JSONArray();
if (queueCapacitiesByPartition instanceof JSONArray) {
partitionsCapsArray = JSONArray.class.cast(queueCapacitiesByPartition);
} else {
partitionsCapsArray.put(queueCapacitiesByPartition);
}
JSONObject partitionInfo = null;
String partitionName = null;
switch (queue) {
case QUEUE_A:
assertEquals("incorrect number of partitions", 1,
partitionsCapsArray.length());
partitionInfo = partitionsCapsArray.getJSONObject(0);
partitionName = partitionInfo.getString("partitionName");
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0, 50);
assertEquals("incorrect number of elements", 7,
partitionsResourcesArray.getJSONObject(0).length());
assertEquals("incorrect number of objects", 1,
resourceUsageByPartition.length());
break;
case QUEUE_B:
assertEquals("Invalid default Label expression", LABEL_LX,
queueJson.getString("defaultNodeLabelExpression"));
assertEquals("incorrect number of elements", 7,
partitionsResourcesArray.getJSONObject(0).length());
verifyAccesibleNodeLabels(queueJson, ImmutableSet.of(LABEL_LX));
assertEquals("incorrect number of partitions", 2,
partitionsCapsArray.length());
assertEquals("incorrect number of objects", 2,
resourceUsageByPartition.length());
for (int j = 0; j < partitionsCapsArray.length(); j++) {
partitionInfo = partitionsCapsArray.getJSONObject(j);
partitionName = partitionInfo.getString("partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 100, 30, 0,
100);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0,
50);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
break;
case QUEUE_C:
verifyAccesibleNodeLabels(queueJson,
ImmutableSet.of(LABEL_LX, LABEL_LY));
assertEquals("incorrect number of elements", 4,
partitionsResourcesArray.getJSONObject(0).length());
verifyQcPartitionsCapacityInfoJson(partitionsCapsArray, 70, 100, 70,
100, 100, 100, 100, 100, 40, 50, 40, 50);
verifySubQueuesOfQc(queueJson);
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
}
private void verifyAccesibleNodeLabels(JSONObject queueJson,
Set<String> accesibleNodeLabels) throws JSONException {
Object nodeLabelsObj = queueJson.get("nodeLabels");
JSONArray nodeLabels = new JSONArray();
if(nodeLabelsObj instanceof JSONArray){
nodeLabels = queueJson.getJSONArray("nodeLabels");
} else {
nodeLabels.put(nodeLabelsObj);
}
assertEquals("number of accessible Node Labels not matching",
accesibleNodeLabels.size(), nodeLabels.length());
for (int i = 0; i < nodeLabels.length(); i++) {
assertTrue("Invalid accessible node label : " + nodeLabels.getString(i),
accesibleNodeLabels.contains(nodeLabels.getString(i)));
}
}
private void verifySubQueuesOfQc(JSONObject queueCJson) throws JSONException {
JSONObject jsonQueuesObject = queueCJson.getJSONObject("queues");
JSONArray queuesArray = jsonQueuesObject.getJSONArray("queue");
for (int i = 0; i < queuesArray.length(); i++) {
JSONObject queueJson = queuesArray.getJSONObject(i);
String queue = queueJson.getString("queueName");
JSONObject capacitiesJsonObject = queueJson.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);
switch (queue) {
case LEAF_QUEUE_C1:
verifyAccesibleNodeLabels(queueJson,
ImmutableSet.of(LABEL_LX, LABEL_LY));
assertEquals("Invalid default Label expression", LABEL_LX,
queueJson.getString("defaultNodeLabelExpression"));
verifyQcPartitionsCapacityInfoJson(partitionsCapsArray, 40, 100, 28,
100, 50, 75, 50, 75, 50, 60, 20, 30);
break;
case LEAF_QUEUE_C2:
verifyAccesibleNodeLabels(queueJson,
ImmutableSet.of(LABEL_LX, LABEL_LY));
assertEquals("Invalid default Label expression", LABEL_LY,
queueJson.getString("defaultNodeLabelExpression"));
verifyQcPartitionsCapacityInfoJson(partitionsCapsArray, 60, 100, 42,
100, 50, 75, 50, 75, 50, 70, 20, 35);
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
}
private void verifyQcPartitionsCapacityInfoJson(JSONArray partitionsCapsArray,
float lxCaps, float lxMaxCaps, float lxAbsCaps, float lxAbsMaxCaps,
float lyCaps, float lyMaxCaps, float lyAbsCaps, float lyAbsMaxCaps,
float defCaps, float defMaxCaps, float defAbsCaps, float defAbsMaxCaps)
throws JSONException {
assertEquals("incorrect number of partitions", CLUSTER_LABELS.size(),
partitionsCapsArray.length());
for (int j = 0; j < partitionsCapsArray.length(); j++) {
JSONObject partitionInfo = partitionsCapsArray.getJSONObject(j);
String partitionName = partitionInfo.getString("partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoJson(partitionInfo, lxCaps, 0, lxMaxCaps,
lxAbsCaps, 0, lxAbsMaxCaps);
break;
case LABEL_LY:
verifyPartitionCapacityInfoJson(partitionInfo, lyCaps, 0, lyMaxCaps,
lyAbsCaps, 0, lyAbsMaxCaps);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoJson(partitionInfo, defCaps, 0, defMaxCaps,
defAbsCaps, 0, defAbsMaxCaps);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
}
private void verifyPartitionCapacityInfoJson(
JSONObject partitionCapacityInfoJson, float capacity, float usedCapacity,
float maxCapacity, float absoluteCapacity, float absoluteUsedCapacity,
float absoluteMaxCapacity) throws JSONException {
assertEquals("capacity doesn't match", capacity,
(float) partitionCapacityInfoJson.getDouble("capacity"), EPSILON);
assertEquals("capacity doesn't match", usedCapacity,
(float) partitionCapacityInfoJson.getDouble("usedCapacity"), EPSILON);
assertEquals("capacity doesn't match", maxCapacity,
(float) partitionCapacityInfoJson.getDouble("maxCapacity"), EPSILON);
assertEquals("capacity doesn't match", absoluteCapacity,
(float) partitionCapacityInfoJson.getDouble("absoluteCapacity"), EPSILON);
assertEquals("capacity doesn't match", absoluteUsedCapacity,
(float) partitionCapacityInfoJson.getDouble("absoluteUsedCapacity"),
1e-3f);
assertEquals("capacity doesn't match", absoluteMaxCapacity,
(float) partitionCapacityInfoJson.getDouble("absoluteMaxCapacity"),
1e-3f);
}
}