TestRMWebServicesSchedulerActivities.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.TestProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.reader.NodeLabelsInfoReader;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Application;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_ALLOCATIONS;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_ALLOCATION_REQUEST_ID;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_ALLOCATION_STATE;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_DIAGNOSTIC;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_FINAL_ALLOCATION_STATE;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_NODE_ID;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_NODE_IDS;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_ACT_REQUEST_PRIORITY;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_APP_ACT_CHILDREN;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_APP_ACT_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_CHILDREN;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_NAME;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_ALLOCATIONS_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_BULK_ACT_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.TOTAL_RESOURCE_INSUFFICIENT_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.getFirstSubNodeFromJson;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.getSubNodesFromJson;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocationAttempts;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocations;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfNodes;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyQueueOrder;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyStateOfAllocations;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
/**
* Tests for scheduler/app activities.
*/
public class TestRMWebServicesSchedulerActivities extends JerseyTestBase {
private MockRM rm;
@Override
protected Application configure() {
ResourceConfig config = new ResourceConfig();
config.register(RMWebServices.class);
config.register(new JerseyBinder());
config.register(GenericExceptionHandler.class);
config.register(NodeLabelsInfoReader.class);
config.register(new JettisonFeature()).register(JAXBContextResolver.class);
forceSet(TestProperties.CONTAINER_PORT, JERSEY_RANDOM_PORT);
return config;
}
private class JerseyBinder extends AbstractBinder {
@Override
protected void configure() {
CapacitySchedulerConfiguration config =
createConfig(new CapacitySchedulerConfiguration(new Configuration(false)));
rm = createMockRM(config);
bind(rm).to(ResourceManager.class).named("rm");
bind(config).to(Configuration.class).named("conf");
HttpServletRequest request = mock(HttpServletRequest.class);
bind(request).to(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);
bind(response).to(HttpServletResponse.class);
}
}
public TestRMWebServicesSchedulerActivities() {
}
@BeforeEach
@Override
public void setUp() throws Exception {
super.setUp();
}
public static MockRM createMockRM(CapacitySchedulerConfiguration csConf) {
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
return new MockRM(conf);
}
public static void setupQueueConfiguration(
CapacitySchedulerConfiguration config) {
config.set("yarn.scheduler.capacity.root.queues", "a, b, c");
config.set("yarn.scheduler.capacity.root.a.queues", "a1, a2");
config.set("yarn.scheduler.capacity.root.b.queues", "b1, b2, b3");
config.set("yarn.scheduler.capacity.root.a.a1.queues", "a1a, a1b, a1c");
config.set("yarn.scheduler.capacity.root.a.capacity", "10.5");
config.set("yarn.scheduler.capacity.root.a.maximum-capacity", "50");
config.set("yarn.scheduler.capacity.root.a.max-parallel-app", "42");
config.set("yarn.scheduler.capacity.root.b.capacity", "79.5");
config.set("yarn.scheduler.capacity.root.c.capacity", "10");
config.set("yarn.scheduler.capacity.root.a.a1.capacity", "30");
config.set("yarn.scheduler.capacity.root.a.a1.maximum-capacity", "50");
config.set("yarn.scheduler.capacity.root.a.a1.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.a.a2.capacity", "70");
config.set("yarn.scheduler.capacity.root.a.a2.maximum-application-lifetime", "100");
config.set("yarn.scheduler.capacity.root.a.a2.default-application-lifetime", "50");
config.set("yarn.scheduler.capacity.root.a.a2.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.b.b1.capacity", "60");
config.set("yarn.scheduler.capacity.root.b.b2.capacity", "39.5");
config.set("yarn.scheduler.capacity.root.b.b3.capacity", "0.5");
config.set("yarn.scheduler.capacity.root.b.b1.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.b.b2.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.b.b3.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.a.a1.a1a.capacity", "65");
config.set("yarn.scheduler.capacity.root.a.a1.a1b.capacity", "15");
config.set("yarn.scheduler.capacity.root.a.a1.a1c.capacity", "20");
config.set("yarn.scheduler.capacity.root.a.a1.a1c.auto-create-child-queue.enabled", "true");
config.set("yarn.scheduler.capacity.root.a.a1.a1c.leaf-queue-template.capacity", "50");
}
@Test
public void testAssignMultipleContainersPerNodeHeartbeat()
throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebTarget r = targetWithJsonObject();
MultivaluedMap params = new MultivaluedHashMap();
params.add("nodeId", "127.0.0.1:1234");
Response response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1:1234").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1:1234").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
// Collection logic of scheduler activities changed after YARN-9313,
// only one allocation should be recorded for all scenarios.
verifyNumberOfAllocations(json, 1);
JSONObject allocation = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS);
verifyStateOfAllocations(allocation,
FN_ACT_FINAL_ALLOCATION_STATE, "ALLOCATED");
verifyQueueOrder(allocation,
"root-root.a-root.c-root.b-root.b.b2-root.b.b3-root.b.b1");
} finally {
rm.stop();
}
}
@Test
public void testAssignWithoutAvailableResource() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebTarget r = targetWithJsonObject();
MultivaluedMap params = new MultivaluedHashMap();
params.add("nodeId", "127.0.0.1");
Response response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1").
request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
// verify scheduler activities
verifyNumberOfAllocations(json, 1);
JSONObject rootObj = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS)
.getJSONObject(FN_SCHEDULER_ACT_ALLOCATIONS_ROOT);
assertTrue(rootObj.optString(FN_ACT_DIAGNOSTIC).startsWith(
ActivityDiagnosticConstant.
INIT_CHECK_SINGLE_NODE_RESOURCE_INSUFFICIENT));
} finally {
rm.stop();
}
}
@Test
public void testNoNM() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
try {
//Get JSON
WebTarget r = targetWithJsonObject();
Response response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1:1234").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1:1234").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
} finally {
rm.stop();
}
}
@Test
public void testWrongNodeId() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebTarget r = targetWithJsonObject();
Response response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.0").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.0").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
} finally {
rm.stop();
}
}
@Test
public void testReserveNewContainer() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app2")
.withUser("user1")
.withAcls(null)
.withQueue("b2")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm, data);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
10)), null);
// Reserve new container
WebTarget r = targetWithJsonObject();
Response response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.2").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.2").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocations = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS);
verifyQueueOrder(allocations,
"root-root.a-root.c-root.b-root.b.b3-root.b.b1");
verifyStateOfAllocations(allocations, FN_ACT_FINAL_ALLOCATION_STATE,
"RESERVED");
// Do a node heartbeat again without releasing container from app2
r = targetWithJsonObject();
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.2").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.2").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocation = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS);
verifyQueueOrder(allocation, "root.b.b1");
verifyStateOfAllocations(allocation, FN_ACT_FINAL_ALLOCATION_STATE,
"RESERVED");
// Finish application 2
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ContainerId containerId = ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1);
cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED);
// Do a node heartbeat again
r = targetWithJsonObject();
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.2").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.2").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
allocations = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS);
verifyQueueOrder(allocations, "root.b.b1");
verifyStateOfAllocations(allocations, FN_ACT_FINAL_ALLOCATION_STATE,
"ALLOCATED_FROM_RESERVED");
} finally {
rm.stop();
}
}
@Test
public void testActivityJSON() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
//Get JSON
WebTarget r = targetWithJsonObject();
Response response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParam("nodeId", "127.0.0.1").request(
MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocation = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS);
verifyStateOfAllocations(allocation, FN_ACT_FINAL_ALLOCATION_STATE,
"ALLOCATED");
// Increase number of nodes to 6 since request node has been added
verifyNumberOfNodes(allocation, 6);
verifyQueueOrder(allocation, "root-root.b-root.b.b1");
} finally {
rm.stop();
}
}
@Test
public void testAppActivityJSON() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
//Get JSON
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
ActivitiesTestUtils.requestWebResource(r, params);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
//Check app activities
verifyNumberOfAllocations(json, 1);
JSONObject allocation = getFirstSubNodeFromJson(json,
FN_APP_ACT_ROOT, FN_ACT_ALLOCATIONS);
verifyStateOfAllocations(allocation, FN_ACT_ALLOCATION_STATE,
"ALLOCATED");
//Check request allocation
JSONObject requestAllocationObj =
getFirstSubNodeFromJson(allocation, FN_APP_ACT_CHILDREN);
verifyStateOfAllocations(requestAllocationObj, FN_ACT_ALLOCATION_STATE,
"ALLOCATED");
assertEquals(0,
requestAllocationObj.optInt(FN_ACT_REQUEST_PRIORITY));
assertEquals(-1,
requestAllocationObj.optLong(FN_ACT_ALLOCATION_REQUEST_ID));
//Check allocation attempts
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
List<JSONObject> allocationAttempts =
getSubNodesFromJson(requestAllocationObj, FN_APP_ACT_CHILDREN);
assertEquals(1, allocationAttempts.size());
verifyStateOfAllocations(allocationAttempts.get(0),
FN_ACT_ALLOCATION_STATE, "ALLOCATED");
assertNotNull(allocationAttempts.get(0).get(FN_ACT_NODE_ID));
} finally {
rm.stop();
}
}
@Test
public void testAppAssignMultipleContainersPerNodeHeartbeat()
throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
ActivitiesTestUtils.requestWebResource(r, params);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 10);
List<JSONObject> allocations =
getSubNodesFromJson(json, FN_APP_ACT_ROOT, FN_ACT_ALLOCATIONS);
for (int i = 0; i < allocations.size(); i++) {
verifyStateOfAllocations(allocations.get(i),
FN_ACT_ALLOCATION_STATE, "ALLOCATED");
}
} finally {
rm.stop();
}
}
@Test
public void testAppAssignWithoutAvailableResource() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
ActivitiesTestUtils.requestWebResource(r, params);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 0);
} finally {
rm.stop();
}
}
@Test
public void testAppNoNM() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
try {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
//Get JSON
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
ActivitiesTestUtils.requestWebResource(r, params);
//Get JSON
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 0);
} finally {
rm.stop();
}
}
@Test
public void testAppReserveNewContainer() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app2")
.withUser("user1")
.withAcls(null)
.withQueue("b2")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm, data);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
10)), null);
// Reserve new container
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
ActivitiesTestUtils.requestWebResource(r, params);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
// Do a node heartbeat again without releasing container from app2
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 2);
// Finish application 2
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ContainerId containerId = ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1);
cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED);
// Do a node heartbeat again
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 3);
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testInsufficientResourceDiagnostic() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebTarget r = targetWithJsonObject();
Response response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
assertEquals("waiting for next allocation",
getFirstSubNodeFromJson(json, FN_SCHEDULER_ACT_ROOT)
.optString(FN_ACT_DIAGNOSTIC));
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*",
Resources.createResource(5 * 1024), 1)), null);
//will reserve a container on nm1
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS);
// check diagnostics
Predicate<JSONObject> findReqPred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME).equals("request_-1_-1");
List<JSONObject> app2ReqObjs =
ActivitiesTestUtils.findInAllocations(allocationObj, findReqPred);
assertEquals(1, app2ReqObjs.size());
List<JSONObject> reqAllocations =
getSubNodesFromJson(app2ReqObjs.get(0), FN_SCHEDULER_ACT_CHILDREN);
assertEquals(1, reqAllocations.size());
assertTrue(reqAllocations.get(0).getString(FN_ACT_DIAGNOSTIC)
.contains(TOTAL_RESOURCE_INSUFFICIENT_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testPlacementConstraintDiagnostic() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// init scheduling request
PlacementConstraint pcExpression = PlacementConstraints
.build(PlacementConstraints.targetIn(NODE, allocationTag("foo")));
List<SchedulingRequest> schedulingRequests = new ArrayList<>();
schedulingRequests.add(ActivitiesTestUtils
.schedulingRequest(5, 1, 1, 1, 512, pcExpression, "foo"));
AllocateRequest allocateReq =
AllocateRequest.newBuilder().schedulingRequests(schedulingRequests)
.build();
am1.allocate(allocateReq);
WebTarget r = targetWithJsonObject();
Response response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
assertEquals("waiting for next allocation",
getFirstSubNodeFromJson(json, FN_SCHEDULER_ACT_ROOT)
.optString(FN_ACT_DIAGNOSTIC));
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = getFirstSubNodeFromJson(json,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS);
// check diagnostics
Predicate<JSONObject> findReqPred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME).equals("request_1_1");
List<JSONObject> reqObjs =
ActivitiesTestUtils.findInAllocations(allocationObj, findReqPred);
assertEquals(1, reqObjs.size());
JSONObject reqChild =
getFirstSubNodeFromJson(reqObjs.get(0), FN_SCHEDULER_ACT_CHILDREN);
assertTrue(reqChild.getString(FN_ACT_DIAGNOSTIC)
.contains(UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testAppInsufficientResourceDiagnostic() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// am1 asks for 1 * 5GB container
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*",
Resources.createResource(5 * 1024), 1)), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = getFirstSubNodeFromJson(json,
FN_APP_ACT_ROOT, FN_ACT_ALLOCATIONS);
JSONObject requestAllocationObj =
getFirstSubNodeFromJson(allocationObj, FN_APP_ACT_CHILDREN);
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
JSONObject allocationAttemptObj = getFirstSubNodeFromJson(
requestAllocationObj, FN_APP_ACT_CHILDREN);
verifyStateOfAllocations(allocationAttemptObj, FN_ACT_ALLOCATION_STATE,
"SKIPPED");
assertTrue(allocationAttemptObj.optString(FN_ACT_DIAGNOSTIC)
.contains(TOTAL_RESOURCE_INSUFFICIENT_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testAppPlacementConstraintDiagnostic() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// am1 asks for 1 * 5GB container with PC expression: in,node,foo
PlacementConstraint pcExpression = PlacementConstraints
.build(PlacementConstraints.targetIn(NODE, allocationTag("foo")));
List<SchedulingRequest> schedulingRequests = new ArrayList<>();
schedulingRequests.add(ActivitiesTestUtils
.schedulingRequest(5, 1, 1, 1, 512, pcExpression, "foo"));
AllocateRequest allocateReq =
AllocateRequest.newBuilder().schedulingRequests(schedulingRequests)
.build();
am1.allocate(allocateReq);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = getFirstSubNodeFromJson(json,
FN_APP_ACT_ROOT, FN_ACT_ALLOCATIONS);
JSONObject requestAllocationObj =
getFirstSubNodeFromJson(allocationObj, FN_APP_ACT_CHILDREN);
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
JSONObject allocationAttemptObj = getFirstSubNodeFromJson(
requestAllocationObj, FN_APP_ACT_CHILDREN);
verifyStateOfAllocations(allocationAttemptObj, FN_ACT_ALLOCATION_STATE,
"SKIPPED");
assertTrue(allocationAttemptObj.optString(FN_ACT_DIAGNOSTIC)
.contains(UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testAppFilterByRequestPrioritiesAndAllocationRequestIds()
throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// am1 asks for 1 * 1GB container with requestPriority=-1
// and allocationRequestId=1
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.allocationRequestId(1).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// am1 asks for 1 * 1GB container with requestPriority=-1
// and allocationRequestId=2
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.allocationRequestId(2).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// am1 asks for 1 * 1GB container with requestPriority=0
// and allocationRequestId=1
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.newInstance(0))
.allocationRequestId(1).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// am1 asks for 1 * 1GB container with requestPriority=0
// and allocationRequestId=3
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.newInstance(0))
.allocationRequestId(3).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// query app activities with requestPriorities={0,-1}
MultivaluedMap filterParams1 = new MultivaluedHashMap(params);
filterParams1.add(RMWSConsts.REQUEST_PRIORITIES, "0,-1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams1);
verifyNumberOfAllocations(json, 4);
// query app activities with requestPriorities=-1
MultivaluedMap filterParams2 = new MultivaluedHashMap(params);
filterParams2.add(RMWSConsts.REQUEST_PRIORITIES, "-1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams2);
verifyNumberOfAllocations(json, 2);
JSONArray allocations =
json.getJSONObject(FN_APP_ACT_ROOT).getJSONArray(FN_ACT_ALLOCATIONS);
for (int i=0; i<allocations.length(); i++) {
assertEquals("-1", getFirstSubNodeFromJson(allocations.getJSONObject(i),
FN_APP_ACT_CHILDREN).optString(FN_ACT_REQUEST_PRIORITY));
}
// query app activities with allocationRequestId=1
MultivaluedMap filterParams3 = new MultivaluedHashMap(params);
filterParams3.add(RMWSConsts.ALLOCATION_REQUEST_IDS, "1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams3);
verifyNumberOfAllocations(json, 2);
allocations =
json.getJSONObject(FN_APP_ACT_ROOT).getJSONArray(FN_ACT_ALLOCATIONS);
for (int i = 0; i < allocations.length(); i++) {
assertEquals("1", getFirstSubNodeFromJson(allocations.getJSONObject(i),
FN_APP_ACT_CHILDREN).optString(FN_ACT_ALLOCATION_REQUEST_ID));
}
// query app activities with requestPriorities=0 and allocationRequestId=1
MultivaluedMap filterParams4 = new MultivaluedHashMap(params);
filterParams4.add(RMWSConsts.REQUEST_PRIORITIES, "0");
filterParams4.add(RMWSConsts.ALLOCATION_REQUEST_IDS, "1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams4);
verifyNumberOfAllocations(json, 1);
JSONObject allocation = getFirstSubNodeFromJson(json,
FN_APP_ACT_ROOT, FN_ACT_ALLOCATIONS);
JSONObject request =
getFirstSubNodeFromJson(allocation, FN_APP_ACT_CHILDREN);
assertEquals("0", request.optString(FN_ACT_REQUEST_PRIORITY));
assertEquals("1", request.optString(FN_ACT_ALLOCATION_REQUEST_ID));
// query app activities with requestPriorities=-1
// and allocationRequestId={1,2}
MultivaluedMap filterParams5 = new MultivaluedHashMap(params);
filterParams5.add(RMWSConsts.REQUEST_PRIORITIES, "-1");
filterParams5.add(RMWSConsts.ALLOCATION_REQUEST_IDS, "1,2");
json = ActivitiesTestUtils.requestWebResource(r, filterParams5);
verifyNumberOfAllocations(json, 2);
allocations =
json.getJSONObject(FN_APP_ACT_ROOT).getJSONArray(FN_ACT_ALLOCATIONS);
for (int i = 0; i < allocations.length(); i++) {
assertEquals("-1", getFirstSubNodeFromJson(allocations.getJSONObject(i),
FN_APP_ACT_CHILDREN).optString(FN_ACT_REQUEST_PRIORITY));
}
// query app activities with requestPriorities=-1
// and allocationRequestId={-1,1}
MultivaluedMap filterParams6 = new MultivaluedHashMap(params);
filterParams6.add(RMWSConsts.REQUEST_PRIORITIES, "-1");
filterParams6.add(RMWSConsts.ALLOCATION_REQUEST_IDS, "-1,1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams6);
verifyNumberOfAllocations(json, 1);
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testAppLimit() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// am1 asks for 1 * 5GB container
am1.allocate("*", 5120, 1, new ArrayList<>());
// trigger scheduling triple, there will be 3 app activities in cache
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// query all app activities without limit
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 3);
// query all app activities with limit > 3
params.putSingle(RMWSConsts.LIMIT, "10");
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 3);
// query app activities with limit = 2
params.putSingle(RMWSConsts.LIMIT, "2");
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 2);
// query app activities with limit = 1
params.putSingle(RMWSConsts.LIMIT, "1");
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
// query all app activities with invalid limit
params.putSingle(RMWSConsts.LIMIT, "STRING");
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("limit must be integer!",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// query all app activities with limit = 0
params.putSingle(RMWSConsts.LIMIT, "0");
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("limit must be greater than 0!",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// query all app activities with limit < 0
params.putSingle(RMWSConsts.LIMIT, "-3");
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("limit must be greater than 0!",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testAppActions() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// am1 asks for 10 * 512MB container
am1.allocate("*", 512, 10, new ArrayList<>());
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
params.add("maxTime", 1); //only last for 1 second
// testing invalid action
params.add(RMWSConsts.ACTIONS, "get,invalid-action");
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertTrue(json.getJSONObject(FN_APP_ACT_ROOT)
.getString(FN_ACT_DIAGNOSTIC).startsWith("Got invalid action"));
/*
* testing get action
*/
params.putSingle(RMWSConsts.ACTIONS, "get");
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// app activities won't be recorded
params.putSingle(RMWSConsts.ACTIONS, "get");
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
/*
* testing update action
*/
params.putSingle(RMWSConsts.ACTIONS, "refresh");
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("Successfully received action: refresh",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
Thread.sleep(1000);
// app activities should be recorded
params.putSingle(RMWSConsts.ACTIONS, "get");
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
Thread.sleep(1000);
/*
* testing update and get actions
*/
params.remove(RMWSConsts.ACTIONS);
params.add(RMWSConsts.ACTIONS, "refresh,get");
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
Thread.sleep(1000);
// more app activities should be recorded
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 2);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
Thread.sleep(1000);
// more app activities should be recorded
json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 3);
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testAppSummary() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 4 * 1024);
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 4 * 1024);
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(5120, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
MultivaluedMap params = new MultivaluedHashMap();
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getJSONObject(FN_APP_ACT_ROOT).getString(FN_ACT_DIAGNOSTIC));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// am1 asks for 1 * 5GB container
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(5 * 1024), 1)), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm3.getNodeId())));
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
params.add(RMWSConsts.SUMMARIZE, "true");
params.add(RMWSConsts.GROUP_BY, RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC);
json = ActivitiesTestUtils.requestWebResource(r, params);
// verify that response contains an allocation summary for all nodes
verifyNumberOfAllocations(json, 1);
JSONObject allocation = getFirstSubNodeFromJson(json,
FN_APP_ACT_ROOT, FN_ACT_ALLOCATIONS);
JSONObject reqestAllocation =
getFirstSubNodeFromJson(allocation, FN_APP_ACT_CHILDREN);
JSONArray attempts = reqestAllocation.getJSONArray(FN_APP_ACT_CHILDREN);
assertEquals(2, attempts.length());
for (int i = 0; i < attempts.length(); i++) {
JSONObject attempt = attempts.getJSONObject(i);
if (attempt.getString(FN_ACT_ALLOCATION_STATE)
.equals(ActivityState.SKIPPED.name())) {
assertEquals(2, attempt.getJSONArray(FN_ACT_NODE_IDS).length());
} else if (attempt.getString(FN_ACT_ALLOCATION_STATE)
.equals(ActivityState.RESERVED.name())) {
Object obj = attempt.get(FN_ACT_NODE_IDS);
JSONArray jsonArray = new JSONArray();
if (obj instanceof JSONArray) {
jsonArray = attempt.getJSONArray(FN_ACT_NODE_IDS);
} else {
jsonArray.put(obj);
}
assertEquals(1, jsonArray.length());
assertEquals(nm1.getNodeId().toString(), jsonArray.getString(0));
}
}
} finally {
rm.stop();
}
}
@Test
public void testNodeSkippedBecauseOfRelaxLocality() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("b1")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.resourceName("127.0.0.2")
.capability(Resources.createResource(1024)).numContainers(1)
.build(),
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.resourceName("/default-rack")
.capability(Resources.createResource(1024)).numContainers(1)
.relaxLocality(false)
.build(),
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.resourceName("*")
.capability(Resources.createResource(1024)).numContainers(1)
.relaxLocality(false)
.build()), null);
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
ActivitiesTestUtils.requestWebResource(r, null);
WebTarget sr = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(RMWSConsts.SCHEDULER_ACTIVITIES);
ActivitiesTestUtils.requestWebResource(sr, null);
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
JSONObject appActivitiesJson =
ActivitiesTestUtils.requestWebResource(r, null);
JSONObject schedulerActivitiesJson =
ActivitiesTestUtils.requestWebResource(sr, null);
// verify app activities
verifyNumberOfAllocations(appActivitiesJson, 1);
List<JSONObject> allocationAttempts = ActivitiesTestUtils
.getSubNodesFromJson(appActivitiesJson, FN_APP_ACT_ROOT,
FN_ACT_ALLOCATIONS, FN_APP_ACT_CHILDREN, FN_APP_ACT_CHILDREN);
assertEquals(1, allocationAttempts.size());
assertEquals(
ActivityDiagnosticConstant.NODE_SKIPPED_BECAUSE_OF_RELAX_LOCALITY,
allocationAttempts.get(0).optString(FN_ACT_DIAGNOSTIC));
/*
* verify scheduler activities
*/
verifyNumberOfAllocations(schedulerActivitiesJson, 1);
// verify request activity
Predicate<JSONObject> findA1AQueuePred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME).equals("request_-1_-1");
List<JSONObject> reqObjs = ActivitiesTestUtils.findInAllocations(
getFirstSubNodeFromJson(schedulerActivitiesJson,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS),
findA1AQueuePred);
assertEquals(1, reqObjs.size());
assertEquals(ActivityState.SKIPPED.name(),
reqObjs.get(0).optString(FN_ACT_ALLOCATION_STATE));
// verify node activity
JSONObject nodeObj =
getFirstSubNodeFromJson(reqObjs.get(0), FN_SCHEDULER_ACT_CHILDREN);
assertEquals(nm1.getNodeId().toString(),
nodeObj.optString(FN_ACT_NODE_ID));
assertEquals(
ActivityDiagnosticConstant.NODE_SKIPPED_BECAUSE_OF_RELAX_LOCALITY,
nodeObj.optString(FN_ACT_DIAGNOSTIC));
} finally {
rm.stop();
}
}
@Test
public void testQueueSkippedBecauseOfHeadroom() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(10, rm)
.withAppName("app1")
.withUser("user1")
.withAcls(null)
.withQueue("a1a")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.resourceName("*").capability(Resources.createResource(3072))
.numContainers(1).relaxLocality(false).build()), null);
WebTarget r = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
app1.getApplicationId().toString()));
ActivitiesTestUtils.requestWebResource(r, null);
WebTarget sr = target().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(RMWSConsts.SCHEDULER_ACTIVITIES);
ActivitiesTestUtils.requestWebResource(sr, null);
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
JSONObject appActivitiesJson =
ActivitiesTestUtils.requestWebResource(r, null);
JSONObject schedulerActivitiesJson =
ActivitiesTestUtils.requestWebResource(sr, null);
// verify app activities: diagnostic should be attached at request level
// and there should be no allocation attempts at node level
verifyNumberOfAllocations(appActivitiesJson, 1);
List<JSONObject> requestAllocations = ActivitiesTestUtils
.getSubNodesFromJson(appActivitiesJson, FN_APP_ACT_ROOT,
FN_ACT_ALLOCATIONS, FN_APP_ACT_CHILDREN);
assertEquals(1, requestAllocations.size());
assertEquals(ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM,
requestAllocations.get(0).optString(FN_ACT_DIAGNOSTIC));
assertFalse(requestAllocations.get(0).has(FN_APP_ACT_CHILDREN));
// verify scheduler activities: diagnostic should be attached at request
// level and queue level
verifyNumberOfAllocations(schedulerActivitiesJson, 1);
// verify at queue level
Predicate<JSONObject> findA1AQueuePred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME).equals("root.a.a1.a1a");
List<JSONObject> a1aQueueObj = ActivitiesTestUtils.findInAllocations(
getFirstSubNodeFromJson(schedulerActivitiesJson,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS), findA1AQueuePred);
assertEquals(1, a1aQueueObj.size());
assertEquals(ActivityState.REJECTED.name(),
a1aQueueObj.get(0).optString(FN_ACT_ALLOCATION_STATE));
assertTrue(a1aQueueObj.get(0).optString(FN_ACT_DIAGNOSTIC).startsWith(
ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM));
// verify at request level
Predicate<JSONObject> findReqPred =
(obj) -> obj.optString(FN_SCHEDULER_ACT_NAME).equals("request_-1_-1");
List<JSONObject> reqObj = ActivitiesTestUtils.findInAllocations(
getFirstSubNodeFromJson(schedulerActivitiesJson,
FN_SCHEDULER_ACT_ROOT, FN_ACT_ALLOCATIONS), findReqPred);
assertEquals(1, reqObj.size());
assertEquals(ActivityState.REJECTED.name(),
reqObj.get(0).optString(FN_ACT_ALLOCATION_STATE));
assertTrue(reqObj.get(0).optString(FN_ACT_DIAGNOSTIC).startsWith(
ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM));
} finally {
rm.stop();
}
}
@Test
@Timeout(value = 30)
public void testSchedulerBulkActivities() throws Exception {
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
MockNM[] nms = new MockNM[] {nm1, nm2};
try {
// Validate if response has 5 node activities
int expectedCount = 5;
RESTClient restClient = new RESTClient(5);
restClient.start();
sendHeartbeat(restClient, nms);
JSONObject activitiesJson = restClient.getOutput().getJSONObject(
FN_SCHEDULER_BULK_ACT_ROOT);
Object activities = activitiesJson.get(FN_SCHEDULER_ACT_ROOT);
assertEquals(expectedCount,
((JSONArray) activities).length(), "Number of activities is wrong");
// Validate if response does not exceed max 500
expectedCount = 1000;
restClient = new RESTClient(expectedCount);
restClient.start();
sendHeartbeat(restClient, nms);
activitiesJson = restClient.getOutput().getJSONObject(
FN_SCHEDULER_BULK_ACT_ROOT);
activities = activitiesJson.get(FN_SCHEDULER_ACT_ROOT);
assertEquals(RMWebServices.MAX_ACTIVITIES_COUNT,
((JSONArray) activities).length(), "Max Activities Limit does not work");
} finally {
rm.stop();
}
}
private class RESTClient extends Thread {
private int expectedCount;
private boolean done = false;
private JSONObject json;
RESTClient(int expectedCount) {
this.expectedCount = expectedCount;
}
boolean isDone() {
return done;
}
JSONObject getOutput() {
return json;
}
@Override
public void run() {
WebTarget r = targetWithJsonObject();
Response response = r.path("ws").path("v1").path("cluster")
.path(RMWSConsts.SCHEDULER_BULK_ACTIVITIES)
.queryParam(RMWSConsts.ACTIVITIES_COUNT, expectedCount)
.request(MediaType.APPLICATION_JSON).get(Response.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";"
+ JettyUtils.UTF_8, response.getMediaType().toString());
json = response.readEntity(JSONObject.class);
done = true;
}
}
private void sendHeartbeat(RESTClient restClient, MockNM[] nms)
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
for (MockNM nm : nms) {
nm.nodeHeartbeat(true);
}
} catch (Exception e) {
return false;
}
return restClient.isDone();
}, 10, 20000);
}
private CapacitySchedulerConfiguration createConfig(CapacitySchedulerConfiguration config) {
config.set("yarn.scheduler.capacity.root.queues", "a, b, c");
config.set("yarn.scheduler.capacity.root.a.queues", "a1, a2");
config.set("yarn.scheduler.capacity.root.b.queues", "b1, b2, b3");
config.set("yarn.scheduler.capacity.root.a.a1.queues", "a1a, a1b, a1c");
config.set("yarn.scheduler.capacity.root.a.capacity", "10.5");
config.set("yarn.scheduler.capacity.root.a.maximum-capacity", "50");
config.set("yarn.scheduler.capacity.root.a.max-parallel-app", "42");
config.set("yarn.scheduler.capacity.root.b.capacity", "79.5");
config.set("yarn.scheduler.capacity.root.c.capacity", "10");
config.set("yarn.scheduler.capacity.root.a.a1.capacity", "30");
config.set("yarn.scheduler.capacity.root.a.a1.maximum-capacity", "50");
config.set("yarn.scheduler.capacity.root.a.a1.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.a.a2.capacity", "70");
config.set("yarn.scheduler.capacity.root.a.a2.maximum-application-lifetime", "100");
config.set("yarn.scheduler.capacity.root.a.a2.default-application-lifetime", "50");
config.set("yarn.scheduler.capacity.root.a.a2.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.b.b1.capacity", "60");
config.set("yarn.scheduler.capacity.root.b.b2.capacity", "39.5");
config.set("yarn.scheduler.capacity.root.b.b3.capacity", "0.5");
config.set("yarn.scheduler.capacity.root.b.b1.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.b.b2.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.b.b3.user-limit-factor", "100");
config.set("yarn.scheduler.capacity.root.a.a1.a1a.capacity", "65");
config.set("yarn.scheduler.capacity.root.a.a1.a1b.capacity", "15");
config.set("yarn.scheduler.capacity.root.a.a1.a1c.capacity", "20");
config.set("yarn.scheduler.capacity.root.a.a1.a1c.auto-create-child-queue.enabled", "true");
config.set("yarn.scheduler.capacity.root.a.a1.a1c.leaf-queue-template.capacity", "50");
return config;
}
}