TestRouterWebAppProxy.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.server.webproxy.FedAppReportFetcher;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestRouterWebAppProxy {
private static final Logger LOG = LoggerFactory.getLogger(TestRouterWebAppProxy.class);
public static final String AM_PREFIX = "AM";
public static final String RM_PREFIX = "RM";
public static final String AHS_PREFIX = "AHS";
/*
* Mocked Server is used for simulating the web of AppMaster, ResourceMamanger or TimelineServer.
* */
private static Server mockServer;
private static int mockServerPort = 0;
/**
* Simple http server. Server should send answer with status 200
*/
@BeforeAll
public static void setUp() throws Exception {
mockServer = new Server(0);
((QueuedThreadPool) mockServer.getThreadPool()).setMaxThreads(20);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
context.addServlet(new ServletHolder(new MockWebServlet(AM_PREFIX)), "/amweb/*");
context.addServlet(new ServletHolder(new MockWebServlet(RM_PREFIX)), "/cluster/app/*");
context.addServlet(new ServletHolder(new MockWebServlet(AHS_PREFIX)),
"/applicationhistory/app/*");
mockServer.setHandler(context);
((ServerConnector) mockServer.getConnectors()[0]).setHost("localhost");
mockServer.start();
mockServerPort = ((ServerConnector) mockServer.getConnectors()[0]).getLocalPort();
LOG.info("Running embedded servlet container at: http://localhost:" + mockServerPort);
}
@Test
@Timeout(value = 10)
public void testRouterWebAppProxyFed() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.ROUTER_WEBAPP_ADDRESS, "localhost:9090");
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:" + mockServerPort);
// overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
conf.setInt("hadoop.http.max.threads", 10);
// Create sub cluster information.
SubClusterId subClusterId1 = SubClusterId.newInstance("scid1");
SubClusterId subClusterId2 = SubClusterId.newInstance("scid2");
SubClusterInfo subClusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1",
"10.0.0.1:1", "10.0.0.1:1", "localhost:" + mockServerPort, SubClusterState.SC_RUNNING, 0,
"");
SubClusterInfo subClusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1",
"10.0.0.2:1", "10.0.0.2:1", "10.0.0.2:1", SubClusterState.SC_RUNNING, 0, "");
// App1 and App2 is running applications.
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
String appUrl1 = "http://localhost:" + mockServerPort + "/amweb/" + appId1;
String proxyAppUrl1 = "http://localhost:" + mockServerPort + "/proxy/" + appId1;
String appUrl2 = "http://localhost:" + mockServerPort + "/amweb/" + appId2;
String proxyAppUrl2 = "http://localhost:" + mockServerPort + "/proxy/" + appId2;
// App3 is accepted application, has not registered original url to am.
ApplicationId appId3 = ApplicationId.newInstance(0, 3);
String proxyAppUrl3 = "http://localhost:" + mockServerPort + "/proxy/" + appId3;
// App4 is finished application, has remove from rm, but not remove from timeline server.
ApplicationId appId4 = ApplicationId.newInstance(0, 4);
String proxyAppUrl4 = "http://localhost:" + mockServerPort + "/proxy/" + appId4;
// Mock for application
ApplicationClientProtocol appManager1 = mock(ApplicationClientProtocol.class);
when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId1)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId1, YarnApplicationState.RUNNING, proxyAppUrl1, appUrl1)));
when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId3)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId3, YarnApplicationState.ACCEPTED, proxyAppUrl2, null)));
ApplicationClientProtocol appManager2 = mock(ApplicationClientProtocol.class);
when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId2)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId2, YarnApplicationState.RUNNING, proxyAppUrl3, appUrl2)));
when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
.thenThrow(new ApplicationNotFoundException("APP NOT FOUND"));
ApplicationHistoryProtocol historyManager = mock(ApplicationHistoryProtocol.class);
when(
historyManager.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId4, YarnApplicationState.FINISHED, proxyAppUrl4, null)));
// Initial federation store.
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf);
facade.getStateStore()
.registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo1));
facade.getStateStore()
.registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo2));
facade.addApplicationHomeSubCluster(
ApplicationHomeSubCluster.newInstance(appId1, subClusterId1));
facade.addApplicationHomeSubCluster(
ApplicationHomeSubCluster.newInstance(appId2, subClusterId2));
facade.addApplicationHomeSubCluster(
ApplicationHomeSubCluster.newInstance(appId3, subClusterId1));
facade.addApplicationHomeSubCluster(
ApplicationHomeSubCluster.newInstance(appId4, subClusterId2));
// Start router for test
Router router = new Router();
router.init(conf);
router.start();
String user = UserGroupInformation.getCurrentUser().getUserName();
RequestInterceptorChainWrapper wrapper = mock(RequestInterceptorChainWrapper.class);
FederationClientInterceptor interceptor = mock(FederationClientInterceptor.class);
when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId1)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId1, YarnApplicationState.RUNNING, proxyAppUrl1, appUrl1)));
when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId2)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId2, YarnApplicationState.RUNNING, proxyAppUrl2, appUrl2)));
when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId3)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId3, YarnApplicationState.ACCEPTED, proxyAppUrl3, null)));
when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
.thenReturn(GetApplicationReportResponse.newInstance(
newApplicationReport(appId4, YarnApplicationState.FINISHED, proxyAppUrl4, null)));
when(wrapper.getRootInterceptor()).thenReturn(interceptor);
router.getClientRMProxyService().getUserPipelineMap().put(user, wrapper);
try {
// set Mocked rm and timeline
FedAppReportFetcher appReportFetcher = router.getFetcher();
appReportFetcher.registerSubCluster(subClusterInfo1, appManager1);
appReportFetcher.registerSubCluster(subClusterInfo2, appManager2);
appReportFetcher.setHistoryManager(historyManager);
// App1 is running in subcluster1, and original url is registered in rm of subCluster1.
// So router will get original url from rm by getApplicationReport. Then router
// will fetch the webapp directly.
GetApplicationReportResponse response = router.getClientRMProxyService()
.getApplicationReport(GetApplicationReportRequest.newInstance(appId1));
URL url = new URL(response.getApplicationReport().getTrackingUrl());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
assertEquals(AM_PREFIX + "/" + appId1, readResponse(conn));
conn.disconnect();
// App2 is running in subcluster2, and original url is registered
// in rm of subCluster2. So router will get original url from rm by
// getApplicationReport. Then router will fetch the webapp directly.
response = router.getClientRMProxyService()
.getApplicationReport(GetApplicationReportRequest.newInstance(appId2));
url = new URL(response.getApplicationReport().getTrackingUrl());
conn = (HttpURLConnection) url.openConnection();
conn.connect();
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
assertEquals(AM_PREFIX + "/" + appId2, readResponse(conn));
conn.disconnect();
// App3 is accepted in subcluster1, and original url is not registered
// yet. So router will fetch the application web from rm.
response = router.getClientRMProxyService()
.getApplicationReport(GetApplicationReportRequest.newInstance(appId3));
url = new URL(response.getApplicationReport().getTrackingUrl());
conn = (HttpURLConnection) url.openConnection();
conn.connect();
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
assertEquals(RM_PREFIX + "/" + appId3, readResponse(conn));
conn.disconnect();
// App4 is finished in subcluster2, and have removed from rm, but not
// removed from timeline server. So rouer will fetch the
// application web from timeline server.
response = router.getClientRMProxyService()
.getApplicationReport(GetApplicationReportRequest.newInstance(appId4));
url = new URL(response.getApplicationReport().getTrackingUrl());
conn = (HttpURLConnection) url.openConnection();
conn.connect();
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
assertEquals(AHS_PREFIX + "/" + appId4, readResponse(conn));
conn.disconnect();
} finally {
router.close();
}
}
private ApplicationReport newApplicationReport(ApplicationId appId, YarnApplicationState state,
String trackingUrl, String origTrackingUrl) {
return ApplicationReport.newInstance(appId, null, "testuser", null, null, null, 0, null, state,
null, trackingUrl, 0, 0, 0, null, null, origTrackingUrl, 0f, null, null);
}
private String readResponse(HttpURLConnection conn) throws IOException {
InputStream input = conn.getInputStream();
byte[] bytes = new byte[input.available()];
input.read(bytes);
return new String(bytes);
}
/*
* This servlet is used for simulate the web of AppMaster, ResourceManager,
* TimelineServer and so on.
* */
public static class MockWebServlet extends HttpServlet {
private String role;
public MockWebServlet(String role) {
this.role = role;
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
if (req.getPathInfo() != null) {
resp.getWriter().write(role + req.getPathInfo());
}
resp.setStatus(HttpServletResponse.SC_OK);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
InputStream is = req.getInputStream();
OutputStream os = resp.getOutputStream();
int c = is.read();
while (c > -1) {
os.write(c);
c = is.read();
}
is.close();
os.close();
resp.setStatus(HttpServletResponse.SC_OK);
}
}
}