TestRMWebServicesReservation.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.TestWebServiceUtil.toJson;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.URL;
import java.security.Principal;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Properties;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.jettison.JettisonJaxbContext;
import org.glassfish.jersey.jettison.JettisonUnmarshaller;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.TestProperties;
@RunWith(Parameterized.class)
public class TestRMWebServicesReservation extends JerseyTestBase {
private String webserviceUserName = "testuser";
private static boolean setAuthFilter = false;
private static boolean enableRecurrence = false;
private static MockRM rm;
private static final int MINIMUM_RESOURCE_DURATION = 100000;
private static final Clock clock = new UTCClock();
private static final int MAXIMUM_PERIOD = 86400000;
private static final int DEFAULT_RECURRENCE = MAXIMUM_PERIOD / 10;
private static final String TEST_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
private static final String FS_ALLOC_FILE = new File(TEST_DIR,
"test-fs-queues.xml").getAbsolutePath();
// This is what is used in the test resource files.
private static final String DEFAULT_QUEUE = "dedicated";
private static final String LIST_RESERVATION_PATH = "reservation/list";
private static final String GET_NEW_RESERVATION_PATH =
"reservation/new-reservation";
private static JettisonUnmarshaller reservationSubmissionRequestInfoReader;
static {
try {
JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(
ReservationSubmissionRequestInfo.class);
reservationSubmissionRequestInfoReader = jettisonJaxbContext.createJsonUnmarshaller();
} catch (JAXBException e) {
throw new RuntimeException(e);
}
}
private ResourceConfig config;
private HttpServletRequest hsRequest = mock(HttpServletRequest.class);
private HttpServletResponse hsResponse = mock(HttpServletResponse.class);
/*
* Helper class to allow testing of RM web services which require
* authorization Add this class as a filter in the Guice injector for the
* MockRM
*/
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
@Override
protected Properties getConfiguration(String configPrefix,
FilterConfig filterConfig) throws ServletException {
Properties props = new Properties();
Enumeration<?> names = filterConfig.getInitParameterNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
if (name.startsWith(configPrefix)) {
String value = filterConfig.getInitParameter(name);
props.put(name.substring(configPrefix.length()), value);
}
}
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "false");
return props;
}
}
@Override
protected Application configure() {
config = new ResourceConfig();
config.register(RMWebServices.class);
config.register(GenericExceptionHandler.class);
if (setAuthFilter) {
config.register(TestRMCustomAuthFilter.class);
}
config.register(new JettisonFeature()).register(JAXBContextResolver.class);
forceSet(TestProperties.CONTAINER_PORT, JERSEY_RANDOM_PORT);
return config;
}
private class JerseyBinder extends AbstractBinder {
private Configuration conf = new YarnConfiguration();
@Override
protected void configure() {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
configureScheduler();
rm = new MockRM(conf);
bind(rm).to(ResourceManager.class).named("rm");
bind(conf).to(Configuration.class).named("conf");
bind(hsRequest).to(HttpServletRequest.class);
bind(hsResponse).to(HttpServletResponse.class);
}
public void configureScheduler() {
}
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
}
private class CapTestServletModule extends JerseyBinder {
CapTestServletModule(boolean flag) {
if(flag) {
getConf().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
getConf().setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
}
}
@Override
public void configureScheduler() {
getConf().set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
getConf().setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration(getConf());
String[] queues = { "default", "dedicated" };
QueuePath dedicatedQueuePath = new QueuePath("root.dedicated");
csconf.setQueues(new QueuePath("root"), queues);
csconf.setCapacity(new QueuePath("root.default"), 50.0f);
csconf.setCapacity(dedicatedQueuePath, 50.0f);
csconf.setReservable(dedicatedQueuePath, true);
setConf(csconf);
}
}
private class FairTestServletModule extends JerseyBinder {
FairTestServletModule(boolean flag) {
if(flag) {
getConf().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
// set the admin acls otherwise all users are considered admins
// and we can't test authorization
getConf().setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
}
}
@Override
public void configureScheduler() {
AllocationFileWriter.create()
.drfDefaultQueueSchedulingPolicy()
.addQueue(new AllocationFileQueue.Builder("root")
.aclAdministerApps("someuser ")
.subQueue(new AllocationFileQueue.Builder("default")
.aclAdministerApps("someuser ").build())
.subQueue(new AllocationFileQueue.Builder("dedicated")
.reservation()
.aclAdministerApps("someuser ").build())
.build())
.writeToFile(FS_ALLOC_FILE);
getConf().set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
getConf().set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
}
}
private CapTestServletModule getNoAuthInjectorCap() {
setAuthFilter = false;
return new CapTestServletModule(false);
}
private CapTestServletModule getSimpleAuthInjectorCap() {
setAuthFilter = true;
return new CapTestServletModule(true);
}
private FairTestServletModule getNoAuthInjectorFair() {
setAuthFilter = false;
return new FairTestServletModule(false);
}
private FairTestServletModule getSimpleAuthInjectorFair() {
setAuthFilter = true;
return new FairTestServletModule(true);
}
@Parameters
public static Collection<Object[]> guiceConfigs() {
return Arrays.asList(new Object[][] {{0, true}, {1, true}, {2, true},
{3, true}, {0, false}, {1, false}, {2, false}, {3, false}});
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
public TestRMWebServicesReservation(int run, boolean recurrence) {
enableRecurrence = recurrence;
switch (run) {
case 0:
default:
// No Auth Capacity Scheduler
config.register(getNoAuthInjectorCap());
break;
case 1:
// Simple Auth Capacity Scheduler
config.register(getSimpleAuthInjectorCap());
break;
case 2:
// No Auth Fair Scheduler
config.register(getNoAuthInjectorFair());
break;
case 3:
// Simple Auth Fair Scheduler
config.register(getSimpleAuthInjectorFair());
break;
}
}
private boolean isAuthenticationEnabled() {
return setAuthFilter;
}
private WebTarget constructWebResource(WebTarget target, String... paths) {
for (String path : paths) {
target = target.path(path);
}
if (isAuthenticationEnabled()) {
target = target.queryParam("user.name", webserviceUserName);
}
return target;
}
private WebTarget constructWebResource(String... paths) {
WebTarget target = targetWithJsonObject().path("ws").path("v1").path("cluster");
return this.constructWebResource(target, paths);
}
@After
@Override
public void tearDown() throws Exception {
if (rm != null) {
rm.stop();
}
super.tearDown();
}
@Test
public void testSubmitReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId rid = getReservationIdTestHelper(1);
Response response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, rid);
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
verifyReservationCount(1);
}
rm.stop();
}
@Test
public void testSubmitDuplicateReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId rid = getReservationIdTestHelper(1);
long currentTimestamp = clock.getTime() + MINIMUM_RESOURCE_DURATION;
Response response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, currentTimestamp, "",
rid);
// Make sure that the first submission is successful
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
}
response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, currentTimestamp, "",
rid);
// Make sure that the second submission is successful
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
verifyReservationCount(1);
}
rm.stop();
}
@Test
public void testSubmitDifferentReservationWithSameId() throws Exception {
rm.start();
setupCluster(100);
ReservationId rid = getReservationIdTestHelper(1);
long currentTimestamp = clock.getTime() + MINIMUM_RESOURCE_DURATION;
Response response =
reservationSubmissionTestHelper("reservation/submit", MediaType.APPLICATION_JSON,
currentTimestamp, "res1", rid);
// Make sure that the first submission is successful
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
}
// Change the reservation definition.
response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON,
currentTimestamp + MINIMUM_RESOURCE_DURATION, "res1", rid);
// Make sure that the second submission is unsuccessful
if (this.isAuthenticationEnabled()) {
assertTrue(!isHttpSuccessResponse(response));
verifyReservationCount(1);
}
rm.stop();
}
@Test
public void testFailedSubmitReservation() throws Exception {
rm.start();
// setup a cluster too small to accept the reservation
setupCluster(1);
ReservationId rid = getReservationIdTestHelper(1);
Response response =
reservationSubmissionTestHelper("reservation/submit", MediaType.APPLICATION_JSON, rid);
assertTrue(!isHttpSuccessResponse(response));
rm.stop();
}
@Test
public void testUpdateReservation() throws JSONException, Exception {
rm.start();
setupCluster(100);
ReservationId rid = getReservationIdTestHelper(1);
Response response =
reservationSubmissionTestHelper("reservation/submit", MediaType.APPLICATION_JSON, rid);
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
}
updateReservationTestHelper("reservation/update", rid,
MediaType.APPLICATION_JSON);
rm.stop();
}
@Test
public void testTimeIntervalRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", Long.toString((long) (time * 0.9)))
.queryParam("end-time", Long.toString(time + (long) (0.9 * MINIMUM_RESOURCE_DURATION)))
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONObject reservations = json.
getJSONObject("reservationListInfo").
getJSONObject("reservations");
testRDLHelper(reservations);
String reservationName = reservations.getJSONObject
("reservation-definition").getString("reservation-name");
assertEquals(reservationName, "res_1");
rm.stop();
}
@Test
public void testSameTimeIntervalRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
// If authentication is not enabled then id1 and id2 will be null
if (!this.isAuthenticationEnabled() && id1 == null && id2 == null) {
return;
}
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", id2);
String timeParam = Long.toString(time + MINIMUM_RESOURCE_DURATION / 2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", timeParam)
.queryParam("end-time", timeParam)
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONObject reservations =
json.getJSONObject("reservationListInfo").getJSONObject("reservations");
testRDLHelper(reservations);
String reservationName = reservations.getJSONObject
("reservation-definition").getString("reservation-name");
assertEquals(reservationName, "res_1");
rm.stop();
}
@Test
public void testInvalidTimeIntervalRequestListReservation() throws
Exception {
rm.start();
setupCluster(100);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", "-100")
.queryParam("end-time", "-100")
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONArray reservations = json.
getJSONObject("reservationListInfo").
getJSONArray("reservations");
assertEquals(2, reservations.length());
testRDLHelper(reservations.getJSONObject(0));
testRDLHelper(reservations.getJSONObject(1));
rm.stop();
}
@Test
public void testInvalidEndTimeRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", Long.toString((long) (time +
MINIMUM_RESOURCE_DURATION * 1.3)))
.queryParam("end-time", "-1")
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
if (!enableRecurrence) {
JSONObject reservations = json.
getJSONObject("reservationListInfo").
getJSONObject("reservations");
testRDLHelper(reservations);
String reservationName = reservations
.getJSONObject("reservation-definition")
.getString("reservation-name");
assertEquals("res_2", reservationName);
} else {
// In the case of recurring reservations, both reservations will be
// picked up by the search interval since it is greater than the period
// of the reservation.
JSONArray reservations =
json.getJSONObject("reservationListInfo").getJSONArray("reservations");
assertEquals(2, reservations.length());
}
rm.stop();
}
@Test
public void testEmptyEndTimeRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", new Long((long) (time +
MINIMUM_RESOURCE_DURATION * 1.3)).toString())
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
if (!enableRecurrence) {
JSONObject reservations = json.
getJSONObject("reservationListInfo").getJSONObject("reservations");
testRDLHelper(reservations);
String reservationName = reservations
.getJSONObject("reservation-definition")
.getString("reservation-name");
assertEquals("res_2", reservationName);
} else {
// In the case of recurring reservations, both reservations will be
// picked up by the search interval since it is greater than the period
// of the reservation.
JSONArray reservations =
json.getJSONObject("reservationListInfo").getJSONArray("reservations");
assertEquals(2, reservations.length());
}
rm.stop();
}
@Test
public void testInvalidStartTimeRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", "-1")
.queryParam("end-time", new Long((long)(time +
MINIMUM_RESOURCE_DURATION * 0.9)).toString())
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONObject reservations = json.
getJSONObject("reservationListInfo").
getJSONObject("reservations");
testRDLHelper(reservations);
// only res_1 should fall into the time interval given in the request json.
String reservationName = reservations
.getJSONObject("reservation-definition")
.getString("reservation-name");
assertEquals(reservationName, "res_1");
rm.stop();
}
@Test
public void testEmptyStartTimeRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("end-time", new Long((long)(time +
MINIMUM_RESOURCE_DURATION * 0.9)).toString())
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONObject reservations = json.
getJSONObject("reservationListInfo").getJSONObject("reservations");
testRDLHelper(reservations);
// only res_1 should fall into the time interval given in the request json.
String reservationName = reservations.getJSONObject
("reservation-definition").getString("reservation-name");
assertEquals(reservationName, "res_1");
rm.stop();
}
@Test
public void testQueueOnlyRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
assertThat(json.getJSONObject("reservationListInfo")
.getJSONArray("reservations").length()).isEqualTo(2);
testRDLHelper(json.getJSONObject("reservationListInfo")
.getJSONArray("reservations").getJSONObject(0));
testRDLHelper(json.getJSONObject("reservationListInfo")
.getJSONArray("reservations").getJSONObject(1));
rm.stop();
}
@Test
public void testEmptyQueueRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH);
testListReservationHelper(target, Response.Status.BAD_REQUEST);
rm.stop();
}
@Test
public void testNonExistentQueueRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE + "_invalid");
testListReservationHelper(target, Response.Status.BAD_REQUEST);
rm.stop();
}
@Test
public void testReservationIdRequestListReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
if (id1 != null) {
target = target.queryParam("reservation-id", id1.toString());
}
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONObject reservations = json.
getJSONObject("reservationListInfo").
getJSONObject("reservations");
testRDLHelper(reservations);
String reservationId = reservations.getString("reservation-id");
assertEquals(id1.toString(), reservationId);
rm.stop();
}
@Test
public void testInvalidReservationIdRequestListReservation() throws
Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE);
if (id1 != null) {
target = target.queryParam("reservation-id",
"invalid" + id1.toString());
}
JSONObject response = testListReservationHelper(target, Response.Status.NOT_FOUND);
rm.stop();
}
@Test
public void testIncludeResourceAllocations() throws Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("include-resource-allocations", "true")
.queryParam("queue", DEFAULT_QUEUE);
if (id1 != null) {
target = target.queryParam("reservation-id", id1.toString());
}
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONObject reservations =
json.getJSONObject("reservationListInfo").getJSONObject("reservations");
testRDLHelper(reservations);
String reservationId = reservations.getString("reservation-id");
assertEquals(id1.toString(), reservationId);
assertTrue(reservations.has("resource-allocations"));
rm.stop();
}
@Test
public void testExcludeResourceAllocations() throws Exception {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("include-resource-allocations", "false")
.queryParam("queue", DEFAULT_QUEUE);
if (id1 != null) {
target = target.queryParam("reservation-id", id1.toString());
}
JSONObject json = testListReservationHelper(target);
if (!this.isAuthenticationEnabled() && json == null) {
return;
}
JSONObject reservations = json.
getJSONObject("reservationListInfo").
getJSONObject("reservations");
testRDLHelper(reservations);
String reservationId = reservations.getString("reservation-id");
assertEquals(id1.toString(), reservationId);
assertTrue(!reservations.has("resource-allocations"));
rm.stop();
}
@Test
public void testDeleteReservation() throws JSONException, Exception {
rm.start();
for (int i = 0; i < 100; i++) {
MockNM amNodeManager =
rm.registerNode("127.0.0." + i + ":1234", 100 * 1024);
amNodeManager.nodeHeartbeat(true);
}
ReservationId rid = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit", MediaType
.APPLICATION_JSON, rid);
testDeleteReservationHelper("reservation/delete", rid,
MediaType.APPLICATION_JSON);
rm.stop();
}
/**
* This method is used when a ReservationId is required. Attempt to use REST
* API. If authentication is not enabled, ensure that the response status is
* unauthorized and generate a ReservationId because downstream components
* require a ReservationId for testing.
* @param fallbackReservationId the ReservationId to use if authentication
* is not enabled, causing the getNewReservation
* API to fail.
* @return the object representing the reservation ID.
*/
private ReservationId getReservationIdTestHelper(int fallbackReservationId)
throws Exception {
Thread.sleep(1000);
if (this.isAuthenticationEnabled()) {
Principal principal1 = () -> webserviceUserName;
when(hsRequest.getUserPrincipal()).thenReturn(principal1);
}
Response response = constructWebResource(GET_NEW_RESERVATION_PATH)
.request(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.post(null, Response.class);
if (!this.isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED, response.getStatusInfo());
return ReservationId.newInstance(clock.getTime(), fallbackReservationId);
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json =
response.
readEntity(JSONObject.class).
getJSONObject("new-reservation");
assertEquals("incorrect number of elements", 1, json.length());
ReservationId rid = null;
try {
rid = ReservationId.parseReservationId(json.getString("reservation-id"));
} catch (JSONException j) {
// failure is possible and is checked outside
}
return rid;
}
private Response reservationSubmissionTestHelper(String path,
String media, ReservationId reservationId) throws Exception {
long arrival = clock.getTime() + MINIMUM_RESOURCE_DURATION;
return reservationSubmissionTestHelper(path, media, arrival, "res_1",
reservationId);
}
private Response reservationSubmissionTestHelper(String path,
String media, Long arrival, String reservationName,
ReservationId reservationId) throws Exception {
String reservationJson = loadJsonFile("submit-reservation.json");
String recurrenceExpression = "";
if (enableRecurrence) {
recurrenceExpression = String.format(
"\"recurrence-expression\" : \"%s\",", DEFAULT_RECURRENCE);
}
String reservationJsonRequest = String.format(reservationJson,
reservationId.toString(), arrival, arrival + MINIMUM_RESOURCE_DURATION,
reservationName, recurrenceExpression);
return submitAndVerifyReservation(path, media, reservationJsonRequest);
}
private Response submitAndVerifyReservation(String path, String media,
String reservationJson) throws Exception {
ReservationSubmissionRequestInfo rsci = reservationSubmissionRequestInfoReader.
unmarshalFromJSON(new StringReader(reservationJson),
ReservationSubmissionRequestInfo.class);
Thread.sleep(1000);
Response response = constructWebResource(path)
.request(MediaType.APPLICATION_JSON)
.accept(media)
.post(Entity.entity(toJson(rsci, ReservationSubmissionRequestInfo.class),
MediaType.APPLICATION_JSON_TYPE), Response.class);
if (!this.isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED, response.getStatusInfo());
}
return response;
}
private void updateReservationTestHelper(String path,
ReservationId reservationId, String media) throws Exception {
String reservationJson = loadJsonFile("update-reservation.json");
JettisonUnmarshaller reservationUpdateRequestInfoReader;
try {
JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(
ReservationUpdateRequestInfo.class);
reservationUpdateRequestInfoReader = jettisonJaxbContext.createJsonUnmarshaller();
} catch (JAXBException e) {
throw new RuntimeException(e);
}
ReservationUpdateRequestInfo rsci = reservationUpdateRequestInfoReader.
unmarshalFromJSON(new StringReader(reservationJson),
ReservationUpdateRequestInfo.class);
if (this.isAuthenticationEnabled()) {
// only works when previous submit worked
if(rsci.getReservationId() == null) {
throw new IOException("Incorrectly parsed the reservationId");
}
rsci.setReservationId(reservationId.toString());
}
Thread.sleep(1000);
Response response = constructWebResource(path)
.request(MediaType.APPLICATION_JSON)
.accept(media)
.post(Entity.entity(toJson(rsci, ReservationUpdateRequestInfo.class),
MediaType.APPLICATION_JSON_TYPE), Response.class);
if (!this.isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED, response.getStatusInfo());
return;
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
}
private String loadJsonFile(String filename) throws IOException {
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
URL submitURI = cL.getResource(filename);
String reservationJson =
FileUtils.readFileToString(new File(submitURI.getFile()));
return reservationJson;
}
private void testDeleteReservationHelper(String path,
ReservationId reservationId, String media) throws JSONException,
Exception {
String reservationJson = loadJsonFile("delete-reservation.json");
JettisonUnmarshaller reader;
try {
JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(
ReservationDeleteRequestInfo.class);
reader = jettisonJaxbContext.createJsonUnmarshaller();
} catch (JAXBException e) {
throw new RuntimeException(e);
}
ReservationDeleteRequestInfo rsci = reader.
unmarshalFromJSON(new StringReader(reservationJson),
ReservationDeleteRequestInfo.class);
if (this.isAuthenticationEnabled()) {
// only works when previous submit worked
if(rsci.getReservationId() == null) {
throw new IOException("Incorrectly parsed the reservationId");
}
rsci.setReservationId(reservationId.toString());
}
Thread.sleep(1000);
Response response = constructWebResource(path)
.request(MediaType.APPLICATION_JSON)
.accept(media)
.post(Entity.entity(toJson(rsci, ReservationDeleteRequestInfo.class),
MediaType.APPLICATION_JSON_TYPE), Response.class);
if (!this.isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED, response.getStatusInfo());
return;
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
}
private void testRDLHelper(JSONObject json) throws JSONException {
JSONObject requests = json.getJSONObject("reservation-definition")
.getJSONObject("reservation-requests");
String type = requests.getString
("reservation-request-interpreter");
assertEquals("0", type);
assertEquals(60, requests.getJSONArray("reservation-request")
.getJSONObject(0).getInt("duration"));
}
private JSONObject testListReservationHelper(WebTarget target) throws Exception {
return testListReservationHelper(target, Response.Status.OK);
}
private JSONObject testListReservationHelper(WebTarget target, Response.Status
status) throws Exception {
Thread.sleep(1000);
Response response = target.request().get(Response.class);
if (!this.isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED, response.getStatusInfo());
return null;
}
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
assertResponseStatusCode(status, response.getStatusInfo());
return response.readEntity(JSONObject.class);
}
private void verifyReservationCount(int count) throws Exception {
WebTarget target = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(target);
if (count == 1) {
// If there are any number other than one reservation, this will throw.
json.getJSONObject("reservationListInfo").getJSONObject("reservations");
} else {
JSONArray reservations = json.getJSONArray("reservations");
assertTrue(reservations.length() == count);
}
}
private boolean isHttpSuccessResponse(Response response) {
return (response.getStatusInfo().getStatusCode() / 100) == 2;
}
private void setupCluster(int nodes) throws Exception {
for (int i = 0; i < nodes; i++) {
MockNM amNodeManager =
rm.registerNode("127.0.0." + i + ":1234", 100 * 1024);
amNodeManager.nodeHeartbeat(true);
}
}
}