TestLogAggregationMetaCollector.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.logaggregation;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.filecontroller.FakeLogAggregationFileController;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class TestLogAggregationMetaCollector {
  private static final String TEST_NODE = "TEST_NODE_1";
  private static final String TEST_NODE_2 = "TEST_NODE_2";
  private static final String BIG_FILE_NAME = "TEST_BIG";
  private static final String SMALL_FILE_NAME = "TEST_SMALL";

  private static ApplicationId app = ApplicationId.newInstance(
      Clock.systemDefaultZone().millis(), 1);
  private static ApplicationId app2 = ApplicationId.newInstance(
      Clock.systemDefaultZone().millis(), 2);

  private static ApplicationAttemptId appAttempt =
      ApplicationAttemptId.newInstance(app, 1);
  private static ApplicationAttemptId app2Attempt =
      ApplicationAttemptId.newInstance(app2, 1);

  private static ContainerId attemptContainer =
      ContainerId.newContainerId(appAttempt, 1);
  private static ContainerId attemptContainer2 =
      ContainerId.newContainerId(appAttempt, 2);

  private static ContainerId attempt2Container =
      ContainerId.newContainerId(app2Attempt, 1);
  private static ContainerId attempt2Container2 =
      ContainerId.newContainerId(app2Attempt, 2);

  private FakeNodeFileController fileController;

  private static class FakeNodeFileController
      extends FakeLogAggregationFileController {
    private Map<ImmutablePair<String, String>,
        Map<String, List<ContainerLogFileInfo>>> logFiles;
    private List<FileStatus> appDirs;
    private List<FileStatus> nodeFiles;

    FakeNodeFileController(
        Map<ImmutablePair<String, String>, Map<String,
            List<ContainerLogFileInfo>>> logFiles, List<FileStatus> appDirs,
        List<FileStatus> nodeFiles) {
      this.logFiles = logFiles;
      this.appDirs = appDirs;
      this.nodeFiles = nodeFiles;
    }

    @Override
    public RemoteIterator<FileStatus> getApplicationDirectoriesOfUser(
        String user) throws IOException {
      return new RemoteIterator<FileStatus>() {
        private Iterator<FileStatus> iter = appDirs.iterator();

        @Override
        public boolean hasNext() throws IOException {
          return iter.hasNext();
        }

        @Override
        public FileStatus next() throws IOException {
          return iter.next();
        }
      };
    }

    @Override
    public RemoteIterator<FileStatus> getNodeFilesOfApplicationDirectory(
        FileStatus appDir) throws IOException {
      return new RemoteIterator<FileStatus>() {
        private Iterator<FileStatus> iter = nodeFiles.iterator();

        @Override
        public boolean hasNext() throws IOException {
          return iter.hasNext();
        }

        @Override
        public FileStatus next() throws IOException {
          return iter.next();
        }
      };
    }

    @Override
    public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
        ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
        ApplicationId appId) throws IOException {
      return logFiles.get(new ImmutablePair<>(appId.toString(),
          currentNodeFile.getPath().getName()));
    }
  }

  @BeforeEach
  public void setUp() throws Exception {
    fileController = createFileController();
  }

  @AfterEach
  public void tearDown() throws Exception {
  }

  @Test
  void testAllNull() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    request.setAppId(null);
    request.setContainerId(null);
    request.setFileName(null);
    request.setFileSize(null);
    request.setModificationTime(null);
    request.setNodeId(null);
    request.setUser(null);

    LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
        request.build(), new YarnConfiguration());
    List<ContainerLogMeta> res = collector.collect(fileController);

    List<ContainerLogFileInfo> allFile = res.stream()
        .flatMap(m -> m.getContainerLogMeta().stream())
        .collect(Collectors.toList());
    assertEquals(8, allFile.size());
  }

  @Test
  void testAllSet() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    Set<String> fileSizeExpressions = new HashSet<>();
    fileSizeExpressions.add("<51");
    Set<String> modificationTimeExpressions = new HashSet<>();
    modificationTimeExpressions.add("<1000");
    request.setAppId(app.toString());
    request.setContainerId(attemptContainer.toString());
    request.setFileName(String.format("%s.*", SMALL_FILE_NAME));
    request.setFileSize(fileSizeExpressions);
    request.setModificationTime(modificationTimeExpressions);
    request.setNodeId(TEST_NODE);
    request.setUser("TEST");

    LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
        request.build(), new YarnConfiguration());
    List<ContainerLogMeta> res = collector.collect(fileController);

    List<ContainerLogFileInfo> allFile = res.stream()
        .flatMap(m -> m.getContainerLogMeta().stream())
        .collect(Collectors.toList());
    assertEquals(1, allFile.size());
  }

  @Test
  void testSingleNodeRequest() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    request.setAppId(null);
    request.setContainerId(null);
    request.setFileName(null);
    request.setFileSize(null);
    request.setModificationTime(null);
    request.setNodeId(TEST_NODE);
    request.setUser(null);

    LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
        request.build(), new YarnConfiguration());
    List<ContainerLogMeta> res = collector.collect(fileController);

    List<ContainerLogFileInfo> allFile = res.stream()
        .flatMap(m -> m.getContainerLogMeta().stream())
        .collect(Collectors.toList());
    assertEquals(4, allFile.stream().
        filter(f -> f.getFileName().contains(TEST_NODE)).count());
  }

  @Test
  void testMultipleNodeRegexRequest() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    request.setAppId(null);
    request.setContainerId(null);
    request.setFileName(null);
    request.setFileSize(null);
    request.setModificationTime(null);
    request.setNodeId("TEST_NODE_.*");
    request.setUser(null);

    LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
        request.build(), new YarnConfiguration());
    List<ContainerLogMeta> res = collector.collect(fileController);

    List<ContainerLogFileInfo> allFile = res.stream()
        .flatMap(m -> m.getContainerLogMeta().stream())
        .collect(Collectors.toList());
    assertEquals(8, allFile.size());
  }

  @Test
  void testMultipleFileRegex() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    request.setAppId(null);
    request.setContainerId(null);
    request.setFileName(String.format("%s.*", BIG_FILE_NAME));
    request.setFileSize(null);
    request.setModificationTime(null);
    request.setNodeId(null);
    request.setUser(null);

    LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
        request.build(), new YarnConfiguration());
    List<ContainerLogMeta> res = collector.collect(fileController);

    List<ContainerLogFileInfo> allFile = res.stream()
        .flatMap(m -> m.getContainerLogMeta().stream())
        .collect(Collectors.toList());
    assertEquals(4, allFile.size());
    assertTrue(allFile.stream().allMatch(
        f -> f.getFileName().contains(BIG_FILE_NAME)));
  }

  @Test
  void testContainerIdExactMatch() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    request.setAppId(null);
    request.setContainerId(attemptContainer.toString());
    request.setFileName(null);
    request.setFileSize(null);
    request.setModificationTime(null);
    request.setNodeId(null);
    request.setUser(null);

    LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
        request.build(), new YarnConfiguration());
    List<ContainerLogMeta> res = collector.collect(fileController);

    List<ContainerLogFileInfo> allFile = res.stream()
        .flatMap(m -> m.getContainerLogMeta().stream())
        .collect(Collectors.toList());
    assertEquals(2, allFile.size());
    assertTrue(allFile.stream().allMatch(
        f -> f.getFileName().contains(attemptContainer.toString())));
  }

  @Test
  void testMultipleFileBetweenSize() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    Set<String> fileSizeExpressions = new HashSet<>();
    fileSizeExpressions.add(">50");
    fileSizeExpressions.add("<101");
    request.setAppId(null);
    request.setContainerId(null);
    request.setFileName(null);
    request.setFileSize(fileSizeExpressions);
    request.setModificationTime(null);
    request.setNodeId(null);
    request.setUser(null);

    LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
        request.build(), new YarnConfiguration());
    List<ContainerLogMeta> res = collector.collect(fileController);

    List<ContainerLogFileInfo> allFile = res.stream()
        .flatMap(m -> m.getContainerLogMeta().stream())
        .collect(Collectors.toList());
    assertEquals(4, allFile.size());
    assertTrue(allFile.stream().allMatch(
        f -> f.getFileSize().equals("100")));
  }

  @Test
  void testInvalidQueryStrings() throws IOException {
    ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
    Set<String> fileSizeExpressions = new HashSet<>();
    fileSizeExpressions.add("50");
    fileSizeExpressions.add("101");
    try {
      request.setFileName("*");
      fail("An error should be thrown due to an invalid regex");
    } catch (IllegalArgumentException ignored) {
    }

    try {
      request.setFileSize(fileSizeExpressions);
      fail("An error should be thrown due to multiple exact match expression");
    } catch (IllegalArgumentException ignored) {
    }
  }

  private FakeNodeFileController createFileController() {
    FileStatus appDir = new FileStatus();
    appDir.setPath(new Path(String.format("test/%s", app.toString())));
    FileStatus appDir2 = new FileStatus();
    appDir2.setPath(new Path(String.format("test/%s", app2.toString())));
    List<FileStatus> appDirs = new ArrayList<>();
    appDirs.add(appDir);
    appDirs.add(appDir2);

    FileStatus nodeFile = new FileStatus();
    nodeFile.setPath(new Path(String.format("test/%s", TEST_NODE)));
    FileStatus nodeFile2 = new FileStatus();
    nodeFile2.setPath(new Path(String.format("test/%s", TEST_NODE_2)));
    List<FileStatus> nodeFiles = new ArrayList<>();
    nodeFiles.add(nodeFile);
    nodeFiles.add(nodeFile2);

    Map<ImmutablePair<String, String>, Map<String,
        List<ContainerLogFileInfo>>> internal = new HashMap<>();
    internal.put(new ImmutablePair<>(app.toString(), TEST_NODE),
        createLogFiles(TEST_NODE, attemptContainer));
    internal.put(new ImmutablePair<>(app.toString(), TEST_NODE_2),
        createLogFiles(TEST_NODE_2, attemptContainer2));
    internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE),
        createLogFiles(TEST_NODE, attempt2Container));
    internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE_2),
        createLogFiles(TEST_NODE_2, attempt2Container2));
    return new FakeNodeFileController(internal, appDirs, nodeFiles);
  }

  private Map<String, List<ContainerLogFileInfo>> createLogFiles(
      String nodeId, ContainerId... containerId) {
    Map<String, List<ContainerLogFileInfo>> logFiles = new HashMap<>();
    for (ContainerId c : containerId) {

      List<ContainerLogFileInfo> files = new ArrayList<>();
      ContainerLogFileInfo bigFile = new ContainerLogFileInfo();
      bigFile.setFileName(generateFileName(
          BIG_FILE_NAME, nodeId, c.toString()));
      bigFile.setFileSize("100");
      bigFile.setLastModifiedTime("1000");
      ContainerLogFileInfo smallFile = new ContainerLogFileInfo();
      smallFile.setFileName(generateFileName(
          SMALL_FILE_NAME, nodeId, c.toString()));
      smallFile.setFileSize("50");
      smallFile.setLastModifiedTime("100");
      files.add(bigFile);
      files.add(smallFile);

      logFiles.put(c.toString(), files);
    }
    return logFiles;
  }

  private String generateFileName(
      String name, String nodeId, String containerId) {
    return String.format("%s_%s_%s", name, nodeId, containerId);
  }
}