TestHsWebServicesLogsExtend.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.v2.hs.webapp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.webapp.reader.RemoteLogPathsMessageBodyReader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
import org.apache.hadoop.yarn.server.webapp.LogServlet;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.RemoteLogPathEntry;
import org.apache.hadoop.yarn.server.webapp.dao.RemoteLogPaths;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;


/**
 * We created the following aggregated log structure, and test the log
 * related API endpoints of {@link HsWebServices}.
 *
 * application_1 is finished
 *    attempt_1
 *       container_1 finished on node_1 syslog
 *       container_2 finished on node_1 syslog
 *       container_3 finished on node_2 syslog
 *    attempt_2
 *       container_1 finished on node_1 syslog
 *
 * application_2 is running
 *    attempt_1
 *       container_1 finished on node_1 syslog
 *    attempt_2
 *       container_1 finished on node_1 syslog
 *       container_2 running on node_1 syslog
 *       container_3 running on node_2 syslog (with some already aggregated log)
 *
 */
public class TestHsWebServicesLogsExtend extends JerseyTestBase {

  private static Configuration conf = new YarnConfiguration();
  private static FileSystem fs;

  private static final String LOCAL_ROOT_LOG_DIR = "target/LocalLogs";
  private static final String REMOTE_LOG_ROOT_DIR = "target/logs/";

  private static final String USER = "fakeUser";
  private static final String FILE_NAME = "syslog";
  private static final String REMOTE_LOG_DIR_SUFFIX = "test-logs";
  private static final String[] FILE_FORMATS = {"IFile", "TFile"};

  private static final String NM_WEBADDRESS_1 = "test-nm-web-address-1:9999";
  private static final NodeId NM_ID_1 = NodeId.newInstance("fakeHost1", 9951);
  private static final String NM_WEBADDRESS_2 = "test-nm-web-address-2:9999";
  private static final NodeId NM_ID_2 = NodeId.newInstance("fakeHost2", 9952);

  private static final ApplicationId APPID_1 = ApplicationId.newInstance(1, 1);
  private static final ApplicationId APPID_2 = ApplicationId.newInstance(10, 2);

  private static final ApplicationAttemptId APP_ATTEMPT_1_1 =
      ApplicationAttemptId.newInstance(APPID_1, 1);
  private static final ApplicationAttemptId APP_ATTEMPT_1_2 =
      ApplicationAttemptId.newInstance(APPID_1, 2);
  private static final ApplicationAttemptId APP_ATTEMPT_2_1 =
      ApplicationAttemptId.newInstance(APPID_2, 1);
  private static final ApplicationAttemptId APP_ATTEMPT_2_2 =
      ApplicationAttemptId.newInstance(APPID_2, 2);

  private static final ContainerId CONTAINER_1_1_1 =
      ContainerId.newContainerId(APP_ATTEMPT_1_1, 1);
  private static final ContainerId CONTAINER_1_1_2 =
      ContainerId.newContainerId(APP_ATTEMPT_1_1, 2);
  private static final ContainerId CONTAINER_1_1_3 =
      ContainerId.newContainerId(APP_ATTEMPT_1_1, 3);
  private static final ContainerId CONTAINER_1_2_1 =
      ContainerId.newContainerId(APP_ATTEMPT_1_2, 1);
  private static final ContainerId CONTAINER_2_1_1 =
      ContainerId.newContainerId(APP_ATTEMPT_2_1, 1);
  private static final ContainerId CONTAINER_2_2_1 =
      ContainerId.newContainerId(APP_ATTEMPT_2_2, 1);
  private static final ContainerId CONTAINER_2_2_2 =
      ContainerId.newContainerId(APP_ATTEMPT_2_2, 2);
  private static final ContainerId CONTAINER_2_2_3 =
      ContainerId.newContainerId(APP_ATTEMPT_2_2, 3);

  @Override
  protected Application configure() {
    ResourceConfig config = new ResourceConfig();
    config.register(new JerseyBinder(createReconfiguredServlet()));
    config.register(HsWebServices.class);
    config.register(GenericExceptionHandler.class);
    config.register(new JettisonFeature()).register(JAXBContextResolver.class);
    return config;
  }

  private class JerseyBinder extends AbstractBinder {
    private Configuration newConf;

    JerseyBinder(Configuration newConf) {
      this.newConf = newConf;
    }

    @Override
    protected void configure() {

      HsWebApp webApp = mock(HsWebApp.class);
      when(webApp.name()).thenReturn("hsmockwebapp");

      MockHistoryContext appContext = new MockHistoryContext(0, 1, 2, 1);
      webApp = mock(HsWebApp.class);
      when(webApp.name()).thenReturn("hsmockwebapp");

      ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);

      try {
        doAnswer(invocationOnMock -> {
          GetApplicationReportRequest request = invocationOnMock.getArgument(0);

          // returning the latest application attempt for each application
          if (request.getApplicationId().equals(APPID_1)) {
            return GetApplicationReportResponse.newInstance(
                newApplicationReport(APPID_1, APP_ATTEMPT_1_2, false));
          } else if (request.getApplicationId().equals(APPID_2)) {
            return GetApplicationReportResponse.newInstance(
                newApplicationReport(APPID_2, APP_ATTEMPT_2_2, true));
          }
          throw new RuntimeException("Unknown applicationId: " + request.getApplicationId());
        }).when(mockProtocol).getApplicationReport(any());

        doAnswer(invocationOnMock -> {
          GetContainerReportRequest request = invocationOnMock.getArgument(0);
          ContainerId cId = request.getContainerId();
          // for running containers assign node id and NM web address
          if (cId.equals(CONTAINER_2_2_2)) {
            return GetContainerReportResponse.newInstance(
                newContainerReport(cId, NM_ID_1, NM_WEBADDRESS_1));
          } else if (cId.equals(CONTAINER_2_2_3)) {
            return GetContainerReportResponse.newInstance(
                newContainerReport(cId, NM_ID_2, NM_WEBADDRESS_2));
          }
          // for finished application don't assign node id and NM web address
          return GetContainerReportResponse.newInstance(
              newContainerReport(cId, null, null));
        }).when(mockProtocol).getContainerReport(any());
      } catch (Exception ignore) {
        fail("Failed to setup WebServletModule class");
      }

      Configuration usedConf = newConf == null ? conf : newConf;
      HsWebServices hsWebServices =
          new HsWebServices(appContext, usedConf, webApp, mockProtocol);
      try {
        LogServlet logServlet = hsWebServices.getLogServlet();
        logServlet = spy(logServlet);
        doReturn(null).when(logServlet).getNMWebAddressFromRM(any());
        doReturn(NM_WEBADDRESS_1).when(logServlet).getNMWebAddressFromRM(NM_ID_1.toString());
        doReturn(NM_WEBADDRESS_2).when(logServlet).getNMWebAddressFromRM(NM_ID_2.toString());
        hsWebServices.setLogServlet(logServlet);
      } catch (Exception ignore) {
        fail("Failed to setup WebServletModule class");
      }

      bind(webApp).to(WebApp.class).named("hsWebApp");
      bind(appContext).to(AppContext.class);
      bind(appContext).to(HistoryContext.class).named("ctx");
      bind(conf).to(Configuration.class).named("conf");
      bind(mockProtocol).to(ApplicationClientProtocol.class).named("appClient");
      final HttpServletResponse response = mock(HttpServletResponse.class);
      bind(response).to(HttpServletResponse.class);
      final HttpServletRequest request = mock(HttpServletRequest.class);
      bind(request).to(HttpServletRequest.class);
      hsWebServices.setResponse(response);
      bind(hsWebServices).to(HsWebServices.class);
    }
  }

  @BeforeClass
  public static void setupClass() throws Exception {
    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT_DIR);
    fs = FileSystem.get(conf);
    createAggregatedFolders();
  }

  /**
   * Generating aggregated container logs for all containers
   * except CONTAINER_2_2_2, which is still running.
   *
   * @throws Exception if failed to create aggregated log files
   */
  private static void createAggregatedFolders() throws Exception {
    Map<ContainerId, String> contentsApp1 = new HashMap<>();
    contentsApp1.put(CONTAINER_1_1_1, "Hello-" + CONTAINER_1_1_1);
    contentsApp1.put(CONTAINER_1_1_2, "Hello-" + CONTAINER_1_1_2);
    contentsApp1.put(CONTAINER_1_2_1, "Hello-" + CONTAINER_1_2_1);

    TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
        LOCAL_ROOT_LOG_DIR, APPID_1, contentsApp1, NM_ID_1, FILE_NAME,
        USER, false);

    TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
        LOCAL_ROOT_LOG_DIR, APPID_1, Collections.singletonMap(CONTAINER_1_1_3,
        "Hello-" + CONTAINER_1_1_3), NM_ID_2, FILE_NAME, USER, false);

    Map<ContainerId, String> contentsApp2 = new HashMap<>();
    contentsApp2.put(CONTAINER_2_1_1, "Hello-" + CONTAINER_2_1_1);
    contentsApp2.put(CONTAINER_2_2_1, "Hello-" + CONTAINER_2_2_1);

    TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
        LOCAL_ROOT_LOG_DIR, APPID_2, contentsApp2, NM_ID_1, FILE_NAME,
        USER, false);

    TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
        LOCAL_ROOT_LOG_DIR, APPID_2, Collections.singletonMap(CONTAINER_2_2_3,
        "Hello-" + CONTAINER_2_2_3), NM_ID_2, FILE_NAME, USER, false);
  }

  @AfterClass
  public static void tearDownClass() throws Exception {
    fs.delete(new Path(REMOTE_LOG_ROOT_DIR), true);
    fs.delete(new Path(LOCAL_ROOT_LOG_DIR), true);
  }

  @Test
  public void testRemoteLogDirWithUser() {
    createReconfiguredServlet();

    WebTarget r = target().register(RemoteLogPathsMessageBodyReader.class);

    Response response = r.path("ws").path("v1").path("history")
        .path("remote-log-dir").queryParam(YarnWebServiceParams.REMOTE_USER, USER)
        .request(MediaType.APPLICATION_JSON)
        .get(Response.class);

    RemoteLogPaths res = response.readEntity(RemoteLogPaths.class);

    List<String> collectedControllerNames = new ArrayList<>();
    for (RemoteLogPathEntry entry: res.getPaths()) {
      String path = String.format("%s/%s/bucket-%s-%s",
          YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR, USER,
          REMOTE_LOG_DIR_SUFFIX, entry.getFileController().toLowerCase());
      collectedControllerNames.add(entry.getFileController());
      assertEquals(entry.getPath(), path);
    }

    assertTrue(collectedControllerNames.containsAll(
        Arrays.asList(FILE_FORMATS)));
  }

  @Test
  public void testRemoteLogDir() {
    createReconfiguredServlet();

    UserGroupInformation ugi = UserGroupInformation.
        createRemoteUser(USER);
    UserGroupInformation.setLoginUser(ugi);

    WebTarget r = target().register(RemoteLogPathsMessageBodyReader.class);
    Response response = r.path("ws").path("v1").path("history")
        .path("remote-log-dir")
        .request(MediaType.APPLICATION_JSON).get(Response.class);

    RemoteLogPaths res = response.readEntity(RemoteLogPaths.class);

    List<String> collectedControllerNames = new ArrayList<>();
    for (RemoteLogPathEntry entry: res.getPaths()) {
      String path = String.format("%s/%s/bucket-%s-%s",
          YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR, USER,
          REMOTE_LOG_DIR_SUFFIX, entry.getFileController().toLowerCase());
      collectedControllerNames.add(entry.getFileController());
      assertEquals(entry.getPath(), path);
    }

    assertTrue(collectedControllerNames.containsAll(
        Arrays.asList(FILE_FORMATS)));
  }

  @Test
  public void testRemoteLogDirWithUserAndAppId() {
    createReconfiguredServlet();

    WebTarget r = target().register(RemoteLogPathsMessageBodyReader.class);
    Response response = r.path("ws").path("v1").path("history")
        .path("remote-log-dir")
        .queryParam(YarnWebServiceParams.REMOTE_USER, USER)
        .queryParam(YarnWebServiceParams.APP_ID, APPID_1.toString())
        .request(MediaType.APPLICATION_JSON)
        .get(Response.class);
    RemoteLogPaths res = response.readEntity(new GenericType<RemoteLogPaths>(){});

    List<String> collectedControllerNames = new ArrayList<>();
    for (RemoteLogPathEntry entry: res.getPaths()) {
      String path = String.format("%s/%s/bucket-%s-%s/0001/%s",
          YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR, USER,
          REMOTE_LOG_DIR_SUFFIX, entry.getFileController().toLowerCase(), APPID_1);
      collectedControllerNames.add(entry.getFileController());
      assertEquals(entry.getPath(), path);
    }

    assertTrue(collectedControllerNames.containsAll(Arrays.asList(FILE_FORMATS)));
  }

  private static ApplicationReport newApplicationReport(ApplicationId appId,
      ApplicationAttemptId appAttemptId, boolean running) {
    return ApplicationReport.newInstance(appId, appAttemptId, USER,
        "fakeQueue", "fakeApplicationName", "localhost", 0, null,
        running ? YarnApplicationState.RUNNING : YarnApplicationState.FINISHED,
        "fake an application report", "", 1000L, 1000L, 1000L, null, null,
        "", 50f, "fakeApplicationType", null);
  }

  private static ContainerReport newContainerReport(ContainerId containerId,
      NodeId nodeId, String nmWebAddress) {
    return ContainerReport.newInstance(containerId, null, nodeId,
        Priority.UNDEFINED, 0, 0, null, null, 0, null, nmWebAddress);
  }

  private Configuration createReconfiguredServlet() {
    Configuration newConf = new YarnConfiguration();
    newConf.setStrings(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
        FILE_FORMATS);
    newConf.setClass(String.format(
        YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT, "IFile"),
        LogAggregationIndexedFileController.class,
        LogAggregationFileController.class);
    newConf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
    newConf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
        REMOTE_LOG_DIR_SUFFIX);
    return newConf;
  }
}