TestRMWebServicesAppsModification.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.thirdparty.com.google.common.net.HttpHeaders;
import org.apache.hadoop.util.XMLUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.reader.AppStateReader;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.reader.ApplicationSubmissionContextInfoReader;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.writer.ApplicationSubmissionContextInfoWriter;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import com.google.inject.Singleton;
import org.glassfish.jersey.jettison.JettisonJaxbContext;
import org.glassfish.jersey.jettison.JettisonMarshaller;
import org.glassfish.jersey.jettison.internal.entity.JettisonObjectProvider.App;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.TestProperties;
import static javax.ws.rs.core.Response.Status.ACCEPTED;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.OK;
import static javax.ws.rs.core.Response.Status.UNAUTHORIZED;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestRMWebServicesAppsModification extends JerseyTestBase {
private static MockRM rm;
private static final int CONTAINER_MB = 1024;
private String webserviceUserName = "testuser";
private boolean setAuthFilter = false;
private static final String TEST_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
private static final String FS_ALLOC_FILE = new File(TEST_DIR,
"test-fs-queues.xml").getAbsolutePath();
private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
private static final QueuePath DEFAULT = ROOT.createNewLeaf("default");
private static final QueuePath TEST = ROOT.createNewLeaf("test");
private static final QueuePath TEST_QUEUE = ROOT.createNewLeaf("testqueue");
private ResourceConfig config;
private HttpServletRequest hsRequest = mock(HttpServletRequest.class);
private HttpServletResponse hsResponse = mock(HttpServletResponse.class);
private static final JettisonMarshaller APP_STATE_WRITER;
static {
try {
JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(AppState.class);
APP_STATE_WRITER = jettisonJaxbContext.createJsonMarshaller();
} catch (JAXBException e) {
throw new RuntimeException(e);
}
}
private static final JettisonMarshaller APP_PRIORITY_WRITER;
static {
try {
JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(AppPriority.class);
APP_PRIORITY_WRITER = jettisonJaxbContext.createJsonMarshaller();
} catch (JAXBException e) {
throw new RuntimeException(e);
}
}
private static final JettisonMarshaller APP_QUEUE_WRITER;
static {
try {
JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(AppQueue.class);
APP_QUEUE_WRITER = jettisonJaxbContext.createJsonMarshaller();
} catch (JAXBException e) {
throw new RuntimeException(e);
}
}
private static final JettisonMarshaller APP_TIMEOUT_WRITER;
static {
try {
JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(AppTimeoutInfo.class);
APP_TIMEOUT_WRITER = jettisonJaxbContext.createJsonMarshaller();
} catch (JAXBException e) {
throw new RuntimeException(e);
}
}
@Override
protected Application configure() {
config = new ResourceConfig();
config.register(RMWebServices.class);
config.register(GenericExceptionHandler.class);
config.register(ApplicationSubmissionContextInfoWriter.class);
config.register(ApplicationSubmissionContextInfoReader.class);
config.register(TestRMCustomAuthFilter.class);
config.register(new JettisonFeature()).register(JAXBContextResolver.class);
forceSet(TestProperties.CONTAINER_PORT, JERSEY_RANDOM_PORT);
return config;
}
private class JerseyBinder extends AbstractBinder {
private Configuration conf = new YarnConfiguration();
@Override
protected void configure() {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
configureScheduler();
rm = new MockRM(conf);
bind(rm).to(ResourceManager.class).named("rm");
bind(conf).to(Configuration.class).named("conf");
bind(hsRequest).to(HttpServletRequest.class);
bind(hsResponse).to(HttpServletResponse.class);
}
public void configureScheduler() {
}
public Configuration getConf() {
return conf;
}
}
/*
* Helper class to allow testing of RM web services which require
* authorization Add this class as a filter in the Guice injector for the
* MockRM
*/
@Singleton
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
@Override
protected Properties getConfiguration(String configPrefix,
FilterConfig filterConfig) throws ServletException {
Properties props = new Properties();
Enumeration<?> names = filterConfig.getInitParameterNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
if (name.startsWith(configPrefix)) {
String value = filterConfig.getInitParameter(name);
props.put(name.substring(configPrefix.length()), value);
}
}
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "false");
return props;
}
}
private class CapTestServletModule extends JerseyBinder {
CapTestServletModule(boolean flag) {
if(flag) {
getConf().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
getConf().setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
}
}
@Override
public void configureScheduler() {
getConf().set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
}
}
private class FairTestServletModule extends JerseyBinder {
FairTestServletModule(boolean flag) {
if(flag) {
getConf().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
// set the admin acls otherwise all users are considered admins
// and we can't test authorization
getConf().setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
}
}
@Override
public void configureScheduler() {
AllocationFileWriter.create()
.addQueue(new AllocationFileQueue.Builder("root")
.aclAdministerApps("someuser ")
.subQueue(new AllocationFileQueue.Builder("default")
.aclAdministerApps("someuser ").build())
.subQueue(new AllocationFileQueue.Builder("test")
.aclAdministerApps("someuser ").build())
.build())
.writeToFile(FS_ALLOC_FILE);
getConf().set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
getConf().set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
}
}
private CapTestServletModule getNoAuthInjectorCap() {
setAuthFilter = false;
return new CapTestServletModule(false);
}
private CapTestServletModule getSimpleAuthInjectorCap() {
setAuthFilter = true;
Principal principal = () -> "testuser";
when(hsRequest.getUserPrincipal()).thenReturn(principal);
return new CapTestServletModule(true);
}
private FairTestServletModule getNoAuthInjectorFair() {
setAuthFilter = false;
return new FairTestServletModule(false);
}
private FairTestServletModule getSimpleAuthInjectorFair() {
setAuthFilter = true;
Principal principal = () -> "testuser";
when(hsRequest.getUserPrincipal()).thenReturn(principal);
return new FairTestServletModule(true);
}
@Parameters
public static Collection<Object[]> guiceConfigs() {
return Arrays.asList(new Object[][] { { 0 }, { 1 }, { 2 }, { 3 } });
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
public TestRMWebServicesAppsModification(int run) throws JAXBException {
switch (run) {
case 0:
default:
// No Auth Capacity Scheduler
config.register(getNoAuthInjectorCap());
break;
case 1:
// Simple Auth Capacity Scheduler
config.register(getSimpleAuthInjectorCap());
break;
case 2:
// No Auth Fair Scheduler
config.register(getNoAuthInjectorFair());
break;
case 3:
// Simple Auth Fair Scheduler
config.register(getSimpleAuthInjectorFair());
break;
}
}
private boolean isAuthenticationEnabled() {
return setAuthFilter;
}
private WebTarget constructWebResource(WebTarget r, String... paths) {
WebTarget rt = r;
for (String path : paths) {
rt = rt.path(path);
}
if (isAuthenticationEnabled()) {
rt = rt.queryParam("user.name", webserviceUserName);
}
return rt;
}
private WebTarget constructWebResource(String... paths) {
WebTarget r = target()
.register(App.class)
.register(AppStateReader.class)
.register(ApplicationSubmissionContextInfoReader.class)
.register(ApplicationSubmissionContextInfoWriter.class);
WebTarget ws = r.path("ws").path("v1").path("cluster");
return this.constructWebResource(ws, paths);
}
@Test
public void testSingleAppState() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String mediaType : mediaTypes) {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser(webserviceUserName)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
amNodeManager.nodeHeartbeat(true);
Response response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").request(mediaType).get(Response.class);
assertResponseStatusCode(OK, response.getStatusInfo());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppStateJson(response, RMAppState.ACCEPTED);
} else if (mediaType.contains(MediaType.APPLICATION_XML)) {
verifyAppStateXML(response, RMAppState.ACCEPTED);
}
}
rm.stop();
}
@Test(timeout = 120000)
public void testSingleAppKill() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
String diagnostic = "message1";
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser(webserviceUserName)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
amNodeManager.nodeHeartbeat(true);
AppState targetState =
new AppState(YarnApplicationState.KILLED.toString());
targetState.setDiagnostics(diagnostic);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
}
Response response =
this.constructWebResource("apps", app.getApplicationId().toString(),
"state").request(mediaType)
.put(Entity.entity(entity, contentType), Response.class);
if (!isAuthenticationEnabled()) {
assertResponseStatusCode(UNAUTHORIZED,
response.getStatusInfo());
continue;
}
assertResponseStatusCode(ACCEPTED, response.getStatusInfo());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppStateJson(response, RMAppState.FINAL_SAVING,
RMAppState.KILLED, RMAppState.KILLING, RMAppState.ACCEPTED);
} else {
verifyAppStateXML(response, RMAppState.FINAL_SAVING,
RMAppState.KILLED, RMAppState.KILLING, RMAppState.ACCEPTED);
}
String locationHeaderValue = this.constructWebResource(
"apps", app.getApplicationId().toString(), "state")
.getUri().toString().replace("?user.name=testuser", "");
Client c = ClientBuilder.newClient();
WebTarget tmp = c.target(locationHeaderValue);
if (isAuthenticationEnabled()) {
tmp = tmp.queryParam("user.name", webserviceUserName);
}
response = tmp.request().get(Response.class);
assertResponseStatusCode(OK, response.getStatusInfo());
assertTrue(locationHeaderValue.endsWith("/ws/v1/cluster/apps/"
+ app.getApplicationId().toString() + "/state"));
while (true) {
Thread.sleep(100);
response =
this
.constructWebResource("apps",
app.getApplicationId().toString(), "state").request(mediaType)
.put(Entity.entity(entity, contentType), Response.class);
assertTrue(
(response.getStatusInfo().getStatusCode()
== ACCEPTED.getStatusCode())
|| (response.getStatusInfo().getStatusCode()
== OK.getStatusCode()));
if (response.getStatusInfo().getStatusCode()
== OK.getStatusCode()) {
assertEquals(RMAppState.KILLED, app.getState());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppStateJson(response, RMAppState.KILLED);
} else {
verifyAppStateXML(response, RMAppState.KILLED);
}
assertTrue("Diagnostic message is incorrect",
app.getDiagnostics().toString().contains(diagnostic));
break;
}
}
}
}
rm.stop();
}
@Test
public void testSingleAppKillInvalidState() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
String[] targetStates =
{ YarnApplicationState.FINISHED.toString(), "blah" };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
for (String targetStateString : targetStates) {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser(webserviceUserName)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
amNodeManager.nodeHeartbeat(true);
Response response;
AppState targetState = new AppState(targetStateString);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
}
response =
this
.constructWebResource("apps",
app.getApplicationId().toString(), "state")
.request(mediaType)
.put(Entity.entity(entity, contentType), Response.class);
if (!isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED,
response.getStatusInfo());
continue;
}
assertResponseStatusCode(BAD_REQUEST, response.getStatusInfo());
}
}
}
rm.stop();
}
private static String appStateToJSON(AppState state) throws Exception {
StringWriter stringWriter = new StringWriter();
APP_STATE_WRITER.marshallToJSON(state, stringWriter);
return stringWriter.toString();
}
protected static void verifyAppStateJson(Response response,
RMAppState... states) throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
String responseState = json.getJSONObject("appstate").getString("state");
boolean valid = false;
for (RMAppState state : states) {
if (state.toString().equals(responseState)) {
valid = true;
}
}
String msg = "app state incorrect, got " + responseState;
assertTrue(msg, valid);
}
protected static void verifyAppStateXML(Response response,
RMAppState... appStates) throws ParserConfigurationException,
IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
String xml = response.readEntity(String.class);
DocumentBuilderFactory dbf = XMLUtils.newSecureDocumentBuilderFactory();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("appstate");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String state = WebServicesTestUtils.getXmlString(element, "state");
boolean valid = false;
for (RMAppState appState : appStates) {
if (appState.toString().equals(state)) {
valid = true;
}
}
String msg = "app state incorrect, got " + state;
assertTrue(msg, valid);
}
@Test(timeout = 60000)
public void testSingleAppKillUnauthorized() throws Exception {
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
boolean isFairScheduler =
rm.getResourceScheduler() instanceof FairScheduler;
assumeTrue("This test is only supported on Capacity and Fair Scheduler",
isCapacityScheduler || isFairScheduler);
// FairScheduler use ALLOCATION_FILE to configure ACL
if (isCapacityScheduler) {
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setAcl(ROOT, QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl(DEFAULT, QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
}
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String mediaType : mediaTypes) {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("test")
.withUser("someuser")
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
amNodeManager.nodeHeartbeat(true);
Response response =
this.constructWebResource("apps", app.getApplicationId().toString(),
"state").request(mediaType).get(Response.class);
AppState info = response.readEntity(AppState.class);
info.setState(YarnApplicationState.KILLED.toString());
response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").request(mediaType)
.put(Entity.entity(info, MediaType.APPLICATION_XML), Response.class);
validateResponseStatus(response, Response.Status.FORBIDDEN);
}
rm.stop();
}
@Test
public void testSingleAppKillInvalidId() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
String[] testAppIds = { "application_1391705042196_0001", "random_string" };
for (int i = 0; i < testAppIds.length; i++) {
AppState info = new AppState("KILLED");
Response response =
this.constructWebResource("apps", testAppIds[i], "state")
.request(MediaType.APPLICATION_XML)
.put(Entity.xml(info), Response.class);
if (!isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED,
response.getStatusInfo());
continue;
}
if (i == 0) {
assertResponseStatusCode(Response.Status.NOT_FOUND,
response.getStatusInfo());
} else {
assertResponseStatusCode(BAD_REQUEST,
response.getStatusInfo());
}
}
rm.stop();
}
@After
@Override
public void tearDown() throws Exception {
if (rm != null) {
rm.stop();
}
super.tearDown();
}
/**
* Helper function to wrap frequently used code. It checks the response status
* and checks if it UNAUTHORIZED if we are running with authorization turned
* off or the param passed if we are running with authorization turned on.
*
* @param response
* the ClientResponse object to be checked
* @param expectedAuthorizedMode
* the expected Status in authorized mode.
*/
public void validateResponseStatus(Response response,
Response.Status expectedAuthorizedMode) {
validateResponseStatus(response, Response.Status.UNAUTHORIZED,
expectedAuthorizedMode);
}
/**
* Helper function to wrap frequently used code. It checks the response status
* and checks if it is the param expectedUnauthorizedMode if we are running
* with authorization turned off or the param expectedAuthorizedMode passed if
* we are running with authorization turned on.
*
* @param response
* the ClientResponse object to be checked
* @param expectedUnauthorizedMode
* the expected Status in unauthorized mode.
* @param expectedAuthorizedMode
* the expected Status in authorized mode.
*/
public void validateResponseStatus(Response response,
Response.Status expectedUnauthorizedMode, Response.Status expectedAuthorizedMode) {
if (!isAuthenticationEnabled()) {
assertResponseStatusCode(expectedUnauthorizedMode,
response.getStatusInfo());
} else {
assertResponseStatusCode(expectedAuthorizedMode,
response.getStatusInfo());
}
}
// Simple test - just post to /apps/new-application and validate the response
@Test
public void testGetNewApplication() throws Exception {
rm.start();
String mediaTypes[] =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String acceptMedia : mediaTypes) {
testGetNewApplication(acceptMedia);
}
rm.stop();
}
protected String testGetNewApplication(String mediaType) throws JSONException,
ParserConfigurationException, IOException, SAXException {
Response response =
this.constructWebResource("apps", "new-application").request(mediaType)
.post(null, Response.class);
validateResponseStatus(response, OK);
if (!isAuthenticationEnabled()) {
return "";
}
return validateGetNewApplicationResponse(response);
}
protected String validateGetNewApplicationResponse(Response resp)
throws JSONException, ParserConfigurationException, IOException,
SAXException {
String ret = "";
if (resp.getMediaType().toString().contains(MediaType.APPLICATION_JSON)) {
JSONObject json = resp.readEntity(JSONObject.class);
JSONObject newApplication = json.getJSONObject("NewApplication");
ret = validateGetNewApplicationJsonResponse(newApplication);
} else if (resp.getMediaType().toString().contains(MediaType.APPLICATION_XML)) {
String xml = resp.readEntity(String.class);
ret = validateGetNewApplicationXMLResponse(xml);
} else {
// we should not be here
assertTrue(false);
}
return ret;
}
protected String validateGetNewApplicationJsonResponse(JSONObject json)
throws JSONException {
String appId = json.getString("application-id");
assertTrue(!appId.isEmpty());
JSONObject maxResources = json.getJSONObject("maximum-resource-capability");
long memory = maxResources.getLong("memory");
long vCores = maxResources.getLong("vCores");
assertTrue(memory != 0);
assertTrue(vCores != 0);
return appId;
}
protected String validateGetNewApplicationXMLResponse(String response)
throws ParserConfigurationException, IOException, SAXException {
DocumentBuilderFactory dbf = XMLUtils.newSecureDocumentBuilderFactory();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(response));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("NewApplication");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String appId = WebServicesTestUtils.getXmlString(element, "application-id");
assertTrue(!appId.isEmpty());
NodeList maxResourceNodes =
element.getElementsByTagName("maximum-resource-capability");
assertEquals(1, maxResourceNodes.getLength());
Element maxResourceCapability = (Element) maxResourceNodes.item(0);
long memory =
WebServicesTestUtils.getXmlLong(maxResourceCapability, "memory");
long vCores =
WebServicesTestUtils.getXmlLong(maxResourceCapability, "vCores");
assertTrue(memory != 0);
assertTrue(vCores != 0);
return appId;
}
// Test to validate the process of submitting apps - test for appropriate
// errors as well
@Test
public void testGetNewApplicationAndSubmit() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
String mediaTypes[] =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String acceptMedia : mediaTypes) {
for (String contentMedia : mediaTypes) {
testAppSubmit(acceptMedia, contentMedia);
testAppSubmitErrors(acceptMedia, contentMedia);
}
}
rm.stop();
}
public void testAppSubmit(String acceptMedia, String contentMedia)
throws Exception {
// create a test app and submit it via rest(after getting an app-id) then
// get the app details from the rmcontext and check that everything matches
String lrKey = "example";
String queueName = "testqueue";
// create the queue
String[] queues = { "default", "testqueue" };
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setQueues(ROOT, queues);
csconf.setCapacity(DEFAULT, 50.0f);
csconf.setCapacity(TEST_QUEUE, 50.0f);
when(hsRequest.getRequestURL()).thenReturn(new StringBuffer("/apps"));
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
String appName = "test";
String appType = "test-type";
String urlPath = "apps";
String appId = testGetNewApplication(acceptMedia);
List<String> commands = new ArrayList<>();
commands.add("/bin/sleep 5");
HashMap<String, String> environment = new HashMap<>();
environment.put("APP_VAR", "ENV_SETTING");
HashMap<ApplicationAccessType, String> acls = new HashMap<>();
acls.put(ApplicationAccessType.MODIFY_APP, "testuser1, testuser2");
acls.put(ApplicationAccessType.VIEW_APP, "testuser3, testuser4");
Set<String> tags = new HashSet<>();
tags.add("tag1");
tags.add("tag 2");
CredentialsInfo credentials = new CredentialsInfo();
HashMap<String, String> tokens = new HashMap<>();
HashMap<String, String> secrets = new HashMap<>();
secrets.put("secret1", Base64.encodeBase64String(
"mysecret".getBytes(StandardCharsets.UTF_8)));
credentials.setSecrets(secrets);
credentials.setTokens(tokens);
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationId(appId);
appInfo.setApplicationName(appName);
appInfo.setMaxAppAttempts(2);
appInfo.setQueue(queueName);
appInfo.setApplicationType(appType);
appInfo.setPriority(0);
HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
y.setTimestamp(System.currentTimeMillis());
y.setType(LocalResourceType.FILE);
y.setVisibility(LocalResourceVisibility.APPLICATION);
lr.put(lrKey, y);
appInfo.getContainerLaunchContextInfo().setResources(lr);
appInfo.getContainerLaunchContextInfo().setCommands(commands);
appInfo.getContainerLaunchContextInfo().setEnvironment(environment);
appInfo.getContainerLaunchContextInfo().setAcls(acls);
appInfo.getContainerLaunchContextInfo().getAuxillaryServiceData()
.put("test", Base64.encodeBase64URLSafeString("value12".getBytes(StandardCharsets.UTF_8)));
appInfo.getContainerLaunchContextInfo().setCredentials(credentials);
appInfo.getResource().setMemory(1024);
appInfo.getResource().setvCores(1);
appInfo.setApplicationTags(tags);
// Set LogAggregationContextInfo
String includePattern = "file1";
String excludePattern = "file2";
String rolledLogsIncludePattern = "file3";
String rolledLogsExcludePattern = "file4";
String className = "policy_class";
String parameters = "policy_parameter";
LogAggregationContextInfo logAggregationContextInfo
= new LogAggregationContextInfo();
logAggregationContextInfo.setIncludePattern(includePattern);
logAggregationContextInfo.setExcludePattern(excludePattern);
logAggregationContextInfo.setRolledLogsIncludePattern(
rolledLogsIncludePattern);
logAggregationContextInfo.setRolledLogsExcludePattern(
rolledLogsExcludePattern);
logAggregationContextInfo.setLogAggregationPolicyClassName(className);
logAggregationContextInfo.setLogAggregationPolicyParameters(parameters);
appInfo.setLogAggregationContextInfo(logAggregationContextInfo);
// Set attemptFailuresValidityInterval
long attemptFailuresValidityInterval = 5000;
appInfo.setAttemptFailuresValidityInterval(
attemptFailuresValidityInterval);
// Set ReservationId
String reservationId = ReservationId.newInstance(
System.currentTimeMillis(), 1).toString();
appInfo.setReservationId(reservationId);
Response response =
this.constructWebResource(urlPath).request(acceptMedia)
.post(Entity.entity(appInfo, contentMedia), Response.class);
if (!this.isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED, response.getStatusInfo());
return;
}
assertResponseStatusCode(ACCEPTED, response.getStatusInfo());
assertTrue(response.getHeaders().getFirst(HttpHeaders.LOCATION) != null);
String locURL = (String) response.getHeaders().getFirst(HttpHeaders.LOCATION);
assertTrue(locURL.contains("/apps/application"));
appId = locURL.substring(locURL.indexOf("/apps/") + "/apps/".length());
locURL = "/ws/v1/cluster" + locURL.substring(locURL.indexOf("/apps/"));
WebTarget res = target(locURL);
res = res.queryParam("user.name", webserviceUserName);
response = res.request().get();
assertResponseStatusCode(OK, response.getStatusInfo());
RMApp app =
rm.getRMContext().getRMApps()
.get(ApplicationId.fromString(appId));
assertEquals(appName, app.getName());
assertEquals(webserviceUserName, app.getUser());
assertEquals(2, app.getMaxAppAttempts());
if (app.getQueue().contains("root.")) {
queueName = "root." + queueName;
}
assertEquals(queueName, app.getQueue());
assertEquals(appType, app.getApplicationType());
assertEquals(tags, app.getApplicationTags());
ContainerLaunchContext ctx =
app.getApplicationSubmissionContext().getAMContainerSpec();
assertEquals(commands, ctx.getCommands());
assertEquals(environment, ctx.getEnvironment());
assertEquals(acls, ctx.getApplicationACLs());
Map<String, LocalResource> appLRs = ctx.getLocalResources();
assertTrue(appLRs.containsKey(lrKey));
LocalResource exampleLR = appLRs.get(lrKey);
assertEquals(URL.fromURI(y.getUrl()), exampleLR.getResource());
assertEquals(y.getSize(), exampleLR.getSize());
assertEquals(y.getTimestamp(), exampleLR.getTimestamp());
assertEquals(y.getType(), exampleLR.getType());
assertEquals(y.getPattern(), exampleLR.getPattern());
assertEquals(y.getVisibility(), exampleLR.getVisibility());
Credentials cs = new Credentials();
ByteArrayInputStream str =
new ByteArrayInputStream(app.getApplicationSubmissionContext()
.getAMContainerSpec().getTokens().array());
DataInputStream di = new DataInputStream(str);
cs.readTokenStorageStream(di);
Text key = new Text("secret1");
assertTrue("Secrets missing from credentials object", cs
.getAllSecretKeys().contains(key));
assertEquals("mysecret", new String(cs.getSecretKey(key), StandardCharsets.UTF_8));
// Check LogAggregationContext
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
LogAggregationContext lac = asc.getLogAggregationContext();
assertEquals(includePattern, lac.getIncludePattern());
assertEquals(excludePattern, lac.getExcludePattern());
assertEquals(rolledLogsIncludePattern, lac.getRolledLogsIncludePattern());
assertEquals(rolledLogsExcludePattern, lac.getRolledLogsExcludePattern());
assertEquals(className, lac.getLogAggregationPolicyClassName());
assertEquals(parameters, lac.getLogAggregationPolicyParameters());
// Check attemptFailuresValidityInterval
assertEquals(attemptFailuresValidityInterval,
asc.getAttemptFailuresValidityInterval());
// Check ReservationId
assertEquals(reservationId, app.getReservationId().toString());
response =
this.constructWebResource("apps", appId).request(acceptMedia)
.get(Response.class);
assertResponseStatusCode(OK, response.getStatusInfo());
}
public void testAppSubmitErrors(String acceptMedia, String contentMedia)
throws Exception {
// submit a bunch of bad requests(correct format but bad values) via the
// REST API and make sure we get the right error response codes
String urlPath = "apps";
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
Response response =
this.constructWebResource(urlPath).request(acceptMedia)
.post(Entity.entity(appInfo, contentMedia), Response.class);
validateResponseStatus(response, BAD_REQUEST);
String appId = "random";
appInfo.setApplicationId(appId);
response =
this.constructWebResource(urlPath).request(acceptMedia)
.post(Entity.entity(appInfo, contentMedia), Response.class);
validateResponseStatus(response, BAD_REQUEST);
appId = "random_junk";
appInfo.setApplicationId(appId);
response =
this.constructWebResource(urlPath).request(acceptMedia)
.post(Entity.entity(appInfo, contentMedia), Response.class);
validateResponseStatus(response, BAD_REQUEST);
// bad resource info
appInfo.getResource().setMemory(
rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB) + 1);
appInfo.getResource().setvCores(1);
response =
this.constructWebResource(urlPath).request(acceptMedia)
.post(Entity.entity(appInfo, contentMedia), Response.class);
validateResponseStatus(response, BAD_REQUEST);
appInfo.getResource().setvCores(
rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES) + 1);
appInfo.getResource().setMemory(CONTAINER_MB);
response =
this.constructWebResource(urlPath).request(acceptMedia)
.post(Entity.entity(appInfo, contentMedia), Response.class);
validateResponseStatus(response, BAD_REQUEST);
}
@Test
public void testAppSubmitBadJsonAndXML() throws Exception {
// submit a bunch of bad XML and JSON via the
// REST API and make sure we get error response codes
String urlPath = "apps";
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationName("test");
appInfo.setPriority(3);
appInfo.setMaxAppAttempts(2);
appInfo.setQueue("testqueue");
appInfo.setApplicationType("test-type");
HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
y.setTimestamp(System.currentTimeMillis());
y.setType(LocalResourceType.FILE);
y.setVisibility(LocalResourceVisibility.APPLICATION);
lr.put("example", y);
appInfo.getContainerLaunchContextInfo().setResources(lr);
appInfo.getResource().setMemory(1024);
appInfo.getResource().setvCores(1);
String body =
"<?xml version=\"1.0\" encoding=\"UTF-8\" "
+ "standalone=\"yes\"?><blah/>";
Response response =
this.constructWebResource(urlPath).request(MediaType.APPLICATION_XML)
.post(Entity.entity(body, MediaType.APPLICATION_XML), Response.class);
assertResponseStatusCode(BAD_REQUEST, response.getStatusInfo());
ApplicationSubmissionContextInfo aa = new ApplicationSubmissionContextInfo();
aa.setApplicationId("a");
aa.setApplicationName("b");
body = "application-submission-context:\"{\"a\" : \"b\"}\"";
response =
this.constructWebResource(urlPath).request(MediaType.APPLICATION_XML)
.post(Entity.entity(aa, MediaType.APPLICATION_JSON), Response.class);
validateResponseStatus(response, BAD_REQUEST);
rm.stop();
}
@Test
public void testGetAppQueue() throws Exception {
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] contentTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String contentType : contentTypes) {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser(webserviceUserName)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
amNodeManager.nodeHeartbeat(true);
Response response = this.constructWebResource("apps", app.getApplicationId().toString(),
"queue").request(contentType).get(Response.class);
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
String expectedQueue = "root.default";
if(!isCapacityScheduler) {
expectedQueue = "root." + webserviceUserName;
}
if (contentType.contains(MediaType.APPLICATION_JSON)) {
verifyAppQueueJson(response, expectedQueue);
} else {
verifyAppQueueXML(response, expectedQueue);
}
}
rm.stop();
}
@Test(timeout = 90000)
public void testUpdateAppPriority() throws Exception {
if (!(rm.getResourceScheduler() instanceof CapacityScheduler)) {
// till the fair scheduler modifications for priority is completed
return;
}
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
cs.setClusterMaxPriority(conf);
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
String[] queues = { "default", "test" };
csconf.setQueues(ROOT, queues);
csconf.setCapacity(DEFAULT, 50.0f);
csconf.setCapacity(TEST, 50.0f);
csconf.setAcl(ROOT, QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl(DEFAULT, QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl(TEST, QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser(webserviceUserName)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data1);
amNodeManager.nodeHeartbeat(true);
int modifiedPriority = 8;
AppPriority priority = new AppPriority(modifiedPriority);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appPriorityToJSON(priority);
} else {
entity = priority;
}
Response response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.request(mediaType)
.put(Entity.entity(entity, contentType), Response.class);
if (!isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED,
response.getStatusInfo());
continue;
}
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppPriorityJson(response, modifiedPriority);
} else {
verifyAppPriorityXML(response, modifiedPriority);
}
response = this
.constructWebResource("apps", app.getApplicationId().toString(), "priority")
.request(mediaType).get(Response.class);
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppPriorityJson(response, modifiedPriority);
} else {
verifyAppPriorityXML(response, modifiedPriority);
}
// check unauthorized
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser("someuser")
.build();
app = MockRMAppSubmitter.submit(rm, data);
amNodeManager.nodeHeartbeat(true);
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.request(mediaType)
.put(Entity.entity(entity, contentType), Response.class);
assertResponseStatusCode(Response.Status.FORBIDDEN, response.getStatusInfo());
}
}
rm.stop();
}
@Test(timeout = 90000)
public void testAppMove() throws Exception {
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
String[] queues = { "default", "test" };
csconf.setQueues(ROOT, queues);
csconf.setCapacity(DEFAULT, 50.0f);
csconf.setCapacity(TEST, 50.0f);
csconf.setAcl(ROOT, QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl(DEFAULT, QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl(TEST, QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser(webserviceUserName)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data1);
amNodeManager.nodeHeartbeat(true);
AppQueue targetQueue = new AppQueue("test");
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appQueueToJSON(targetQueue);
} else {
entity = targetQueue;
}
Response response =
this.constructWebResource("apps", app.getApplicationId().toString(),
"queue").request(mediaType).
put(Entity.entity(entity, contentType), Response.class);
if (!isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED,
response.getStatusInfo());
continue;
}
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
String expectedQueue = "root.test";
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppQueueJson(response, expectedQueue);
} else {
verifyAppQueueXML(response, expectedQueue);
}
Assert.assertEquals(expectedQueue, app.getQueue());
// check unauthorized
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser("someuser")
.build();
app = MockRMAppSubmitter.submit(rm, data);
amNodeManager.nodeHeartbeat(true);
response = this.constructWebResource("apps", app.getApplicationId().toString(),
"queue").request().put(Entity.entity(entity, contentType), Response.class);
assertResponseStatusCode(Response.Status.FORBIDDEN, response.getStatusInfo());
if(isCapacityScheduler) {
Assert.assertEquals("root.default", app.getQueue());
}
else {
Assert.assertEquals("root.someuser", app.getQueue());
}
}
}
rm.stop();
}
protected static String appPriorityToJSON(AppPriority targetPriority)
throws Exception {
StringWriter stringWriter = new StringWriter();
APP_PRIORITY_WRITER.marshallToJSON(targetPriority, stringWriter);
return stringWriter.toString();
}
protected static String appQueueToJSON(AppQueue targetQueue) throws Exception {
StringWriter stringWriter = new StringWriter();
APP_QUEUE_WRITER.marshallToJSON(targetQueue, stringWriter);
return stringWriter.toString();
}
protected static void verifyAppPriorityJson(Response response,
int expectedPriority) throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject applicationpriority = json.getJSONObject("applicationpriority");
int responsePriority = applicationpriority.getInt("priority");
assertEquals(expectedPriority, responsePriority);
}
protected static void verifyAppPriorityXML(Response response,
int expectedPriority)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
String xml = response.readEntity(String.class);
DocumentBuilderFactory dbf = XMLUtils.newSecureDocumentBuilderFactory();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("applicationpriority");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
int responsePriority = WebServicesTestUtils.getXmlInt(element, "priority");
assertEquals(expectedPriority, responsePriority);
}
protected static void verifyAppQueueJson(Response response, String queue)
throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject json = response.readEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
String responseQueue = json.getJSONObject("appqueue").getString("queue");
assertEquals(queue, responseQueue);
}
protected static void verifyAppQueueXML(Response response, String queue)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
String xml = response.readEntity(String.class);
DocumentBuilderFactory dbf = XMLUtils.newSecureDocumentBuilderFactory();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("appqueue");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String responseQueue = WebServicesTestUtils.getXmlString(element, "queue");
assertEquals(queue, responseQueue);
}
@Test(timeout = 90000)
public void testUpdateAppTimeout() throws Exception {
rm.start();
rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
// application submitted without timeout
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(CONTAINER_MB, rm)
.withAppName("")
.withUser(webserviceUserName)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
Response response =
this.constructWebResource("apps", app.getApplicationId().toString(),
"timeouts").request(mediaType).get(Response.class);
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
assertEquals(
MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject js =
response.readEntity(JSONObject.class).getJSONObject("timeouts");
JSONObject entity = js.getJSONObject("timeout");
verifyAppTimeoutJson(entity,
ApplicationTimeoutType.LIFETIME, "UNLIMITED", -1);
}
long timeOutFromNow = 60;
String expireTime = Times
.formatISO8601(System.currentTimeMillis() + timeOutFromNow * 1000);
Object entity = getAppTimeoutInfoEntity(ApplicationTimeoutType.LIFETIME,
contentType, expireTime);
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"timeout")
.request(mediaType)
.put(Entity.entity(entity, contentType), Response.class);
if (!isAuthenticationEnabled()) {
assertResponseStatusCode(Response.Status.UNAUTHORIZED,
response.getStatusInfo());
continue;
}
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppTimeoutJson(response, ApplicationTimeoutType.LIFETIME,
expireTime, timeOutFromNow);
} else {
verifyAppTimeoutXML(response, ApplicationTimeoutType.LIFETIME,
expireTime, timeOutFromNow);
}
// verify for negative cases
entity = getAppTimeoutInfoEntity(null,
contentType, null);
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"timeout")
.request(mediaType)
.put(Entity.entity(entity, contentType), Response.class);
assertResponseStatusCode(BAD_REQUEST, response.getStatusInfo());
// invoke get
response =
this.constructWebResource("apps", app.getApplicationId().toString(),
"timeouts", ApplicationTimeoutType.LIFETIME.toString())
.request(mediaType).get(Response.class);
assertResponseStatusCode(Response.Status.OK, response.getStatusInfo());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppTimeoutJson(response, ApplicationTimeoutType.LIFETIME,
expireTime, timeOutFromNow);
}
}
}
rm.stop();
}
private Object getAppTimeoutInfoEntity(ApplicationTimeoutType type,
MediaType contentType, String expireTime) throws Exception {
AppTimeoutInfo timeoutUpdate = new AppTimeoutInfo();
timeoutUpdate.setTimeoutType(type);
timeoutUpdate.setExpiryTime(expireTime);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appTimeoutToJSON(timeoutUpdate);
} else {
entity = timeoutUpdate;
}
return entity;
}
protected static void verifyAppTimeoutJson(Response response,
ApplicationTimeoutType type, String expireTime, long timeOutFromNow)
throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
JSONObject jsonTimeout = response.readEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, jsonTimeout.length());
JSONObject json = jsonTimeout.getJSONObject("timeout");
verifyAppTimeoutJson(json, type, expireTime, timeOutFromNow);
}
protected static void verifyAppTimeoutJson(JSONObject json,
ApplicationTimeoutType type, String expireTime, long timeOutFromNow)
throws JSONException {
assertEquals("incorrect number of elements", 3, json.length());
assertEquals(type.toString(), json.getString("type"));
assertEquals(expireTime, json.getString("expiryTime"));
assertTrue(json.getLong("remainingTimeInSeconds") <= timeOutFromNow);
}
protected static void verifyAppTimeoutXML(Response response,
ApplicationTimeoutType type, String expireTime, long timeOutFromNow)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE + ";" + JettyUtils.UTF_8,
response.getMediaType().toString());
String xml = response.readEntity(String.class);
DocumentBuilderFactory dbf = XMLUtils.newSecureDocumentBuilderFactory();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("timeout");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
assertEquals(type.toString(),
WebServicesTestUtils.getXmlString(element, "type"));
assertEquals(expireTime,
WebServicesTestUtils.getXmlString(element, "expiryTime"));
assertTrue(WebServicesTestUtils.getXmlLong(element,
"remainingTimeInSeconds") < timeOutFromNow);
}
protected static String appTimeoutToJSON(AppTimeoutInfo timeout)
throws Exception {
StringWriter stringWriter = new StringWriter();
APP_TIMEOUT_WRITER.marshallToJSON(timeout, stringWriter);
return stringWriter.toString();
}
}