TestDocumentStoreTimelineReaderImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.documentstore;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DummyDocumentStoreReader;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;

import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

/**
 * Test case for {@link DocumentStoreTimelineReaderImpl}.
 */
@ExtendWith(MockitoExtension.class)
public class TestDocumentStoreTimelineReaderImpl {

  private final DocumentStoreReader<TimelineDocument> documentStoreReader = new
      DummyDocumentStoreReader<>();
  private final List<TimelineEntity> entities = DocumentStoreTestUtils
      .bakeTimelineEntities();
  private final TimelineEntityDocument appTimelineEntity =
      DocumentStoreTestUtils.bakeTimelineEntityDoc();

  private final Configuration conf = new Configuration();
  private final TimelineReaderContext context = new
      TimelineReaderContext(null, null, null,
      1L, null, null, null);
  private final DocumentStoreTimelineReaderImpl timelineReader = new
      DocumentStoreTimelineReaderImpl();

  public TestDocumentStoreTimelineReaderImpl() throws IOException {
  }

  private MockedStatic<DocumentStoreFactory> documentStoreFactoryMockedStatic;

  @BeforeEach
  public void setUp() throws YarnException {
    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
        "TestDB");
    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_ENDPOINT,
        "https://localhost:443");
    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_MASTER_KEY,
        "1234567");
    documentStoreFactoryMockedStatic = mockStatic(DocumentStoreFactory.class);
    when(DocumentStoreFactory.createDocumentStoreReader(
        ArgumentMatchers.any(Configuration.class)))
        .thenReturn(documentStoreReader);
  }

  @AfterEach
  public void close() {
    documentStoreFactoryMockedStatic.close();
  }

  @Test
  public void testFailOnNoCosmosDBConfigs() throws Exception {
    assertThrows(YarnException.class, () -> {
      DocumentStoreUtils.validateCosmosDBConf(new Configuration());
    });
  }

  @Test
  public void testGetEntity() throws Exception {
    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    timelineReader.serviceInit(conf);
    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
    EnumSet<TimelineReader.Field> fieldsToRetrieve = EnumSet.noneOf(
        TimelineReader.Field.class);
    dataToRetrieve.setFieldsToRetrieve(fieldsToRetrieve);

    TimelineEntity timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);

    assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
        .getCreatedTime().longValue());
    assertEquals(0, timelineEntity .getMetrics().size());
    assertEquals(0, timelineEntity.getEvents().size());
    assertEquals(0, timelineEntity.getConfigs().size());
    assertEquals(appTimelineEntity.getInfo().size(),
        timelineEntity.getInfo().size());
  }

  @Test
  public void testGetEntityCustomField() throws Exception {
    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    timelineReader.serviceInit(conf);
    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.METRICS);

    TimelineEntity timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);

    assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
        .getCreatedTime().longValue());
    assertEquals(appTimelineEntity.getMetrics().size(),
        timelineEntity.getMetrics().size());
    assertEquals(0, timelineEntity.getEvents().size());
    assertEquals(0, timelineEntity.getConfigs().size());
    assertEquals(appTimelineEntity.getInfo().size(),
        timelineEntity.getInfo().size());
  }

  @Test
  public void testGetEntityAllFields() throws Exception {
    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    timelineReader.serviceInit(conf);
    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);

    TimelineEntity timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);

    assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
        .getCreatedTime().longValue());
    assertEquals(appTimelineEntity.getMetrics().size(),
        timelineEntity .getMetrics().size());
    assertEquals(appTimelineEntity.getEvents().size(),
        timelineEntity.getEvents().size());
    assertEquals(appTimelineEntity.getConfigs().size(),
        timelineEntity.getConfigs().size());
    assertEquals(appTimelineEntity.getInfo().size(),
        timelineEntity.getInfo().size());
  }

  @Test
  public void testGetAllEntities() throws Exception {
    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    timelineReader.serviceInit(conf);
    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);

    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
        new TimelineEntityFilters.Builder().build(), dataToRetrieve);

    assertEquals(entities.size(), actualEntities.size());
  }

  @Test
  public void testGetEntitiesWithLimit() throws Exception {
    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    timelineReader.serviceInit(conf);
    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();

    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
        new TimelineEntityFilters.Builder().entityLimit(2L).build(),
        dataToRetrieve);

    assertEquals(2, actualEntities.size());
  }

  @Test
  public void testGetEntitiesByWindows() throws Exception {
    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    timelineReader.serviceInit(conf);
    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();

    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
        new TimelineEntityFilters.Builder().createdTimeBegin(1533985554927L)
            .createTimeEnd(1533985554927L).build(), dataToRetrieve);

    assertEquals(1, actualEntities.size());
  }

  @Test
  public void testGetFilteredEntities() throws Exception {

    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    timelineReader.serviceInit(conf);
    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);

    // Get entities based on info filters.
    TimelineFilterList infoFilterList = new TimelineFilterList();
    infoFilterList.addFilter(
        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL,
            "YARN_APPLICATION_ATTEMPT_FINAL_STATUS", "SUCCEEDED"));
    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
        new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(),
        dataToRetrieve);

    assertEquals(1, actualEntities.size());
    // Only one entity with type YARN_APPLICATION_ATTEMPT should be returned.
    for (TimelineEntity entity : actualEntities) {
      if (!entity.getType().equals("YARN_APPLICATION_ATTEMPT")) {
        fail("Incorrect filtering based on info filters");
      }
    }

    // Get entities based on config filters.
    TimelineFilterList confFilterList = new TimelineFilterList();
    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    confFilterList.addFilter(
        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL,
            "YARN_AM_NODE_LABEL_EXPRESSION", "<DEFAULT_PARTITION>"));
    actualEntities = timelineReader.getEntities(context,
        new TimelineEntityFilters.Builder().configFilters(confFilterList)
            .build(), dataToRetrieve);

    assertEquals(1, actualEntities.size());
    // Only one entity with type YARN_APPLICATION should be returned.
    for (TimelineEntity entity : actualEntities) {
      if (!entity.getType().equals("YARN_APPLICATION")) {
        fail("Incorrect filtering based on info filters");
      }
    }

    // Get entities based on event filters.
    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    TimelineFilterList eventFilters = new TimelineFilterList();
    eventFilters.addFilter(
        new TimelineExistsFilter(TimelineCompareOp.EQUAL,
            "CONTAINER_LAUNCHED"));
    actualEntities = timelineReader.getEntities(context,
        new TimelineEntityFilters.Builder().eventFilters(eventFilters).build(),
        dataToRetrieve);

    assertEquals(1, actualEntities.size());
    // Only one entity with type YARN_CONTAINER should be returned.
    for (TimelineEntity entity : actualEntities) {
      if (!entity.getType().equals("YARN_CONTAINER")) {
        fail("Incorrect filtering based on info filters");
      }
    }

    // Get entities based on metric filters.
    TimelineFilterList metricFilterList = new TimelineFilterList();
    metricFilterList.addFilter(new TimelineCompareFilter(
        TimelineCompareOp.GREATER_OR_EQUAL, "MEMORY", 150298624L));
    actualEntities = timelineReader.getEntities(context,
        new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
            .build(), dataToRetrieve);

    assertEquals(1, actualEntities.size());
    // Only one entity with type YARN_CONTAINER should be returned.
    for (TimelineEntity entity : actualEntities) {
      if (!entity.getType().equals("YARN_CONTAINER")) {
        fail("Incorrect filtering based on info filters");
      }
    }
  }

  @Test
  public void testReadingDifferentEntityTypes() throws Exception {

    timelineReader.serviceInit(conf);

    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();

    // reading YARN_FLOW_ACTIVITY
    context.setEntityType(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
    TimelineEntity timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);

    assertEquals(TimelineEntityType.YARN_FLOW_ACTIVITY.toString(),
        timelineEntity.getType());

    // reading YARN_FLOW_RUN
    context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString());
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);

    assertEquals(TimelineEntityType.YARN_FLOW_RUN.toString(),
        timelineEntity.getType());

    // reading YARN_APPLICATION
    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);

    assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
        timelineEntity.getType());
  }

  @Test
  public void testReadingAllEntityTypes() throws Exception {

    timelineReader.serviceInit(conf);

    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
    Set<String> entityTypes = timelineReader.getEntityTypes(context);
    assertTrue(entityTypes.contains(TimelineEntityType.YARN_CONTAINER
        .toString()));
    assertTrue(entityTypes.contains(TimelineEntityType
        .YARN_APPLICATION_ATTEMPT.toString()));
  }

  @Test
  public void testMetricsToRetrieve() throws Exception {

    timelineReader.serviceInit(conf);

    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.METRICS);
    TimelineFilterList timelineFilterList = new TimelineFilterList();

    //testing metrics prefix for OR condition
    timelineFilterList.setOperator(TimelineFilterList.Operator.OR);
    timelineFilterList.addFilter(new TimelinePrefixFilter(
        TimelineCompareOp.EQUAL, "NOTHING"));
    dataToRetrieve.setMetricsToRetrieve(timelineFilterList);

    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    TimelineEntity timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertEquals(0, timelineEntity.getMetrics().size());

    timelineFilterList.addFilter(new TimelinePrefixFilter(
        TimelineCompareOp.EQUAL,
            "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED"));
    dataToRetrieve.setMetricsToRetrieve(timelineFilterList);

    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertTrue(timelineEntity.getMetrics().size() > 0);

    //testing metrics prefix for AND condition
    timelineFilterList.setOperator(TimelineFilterList.Operator.AND);
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertEquals(0, timelineEntity.getMetrics().size());

    dataToRetrieve.getMetricsToRetrieve().getFilterList().remove(0);
    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertTrue(timelineEntity.getMetrics().size() > 0);
  }

  @Test
  public void testConfigsToRetrieve() throws Exception {

    timelineReader.serviceInit(conf);

    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.CONFIGS);
    TimelineFilterList timelineFilterList = new TimelineFilterList();

    //testing metrics prefix for OR condition
    timelineFilterList.setOperator(TimelineFilterList.Operator.OR);
    timelineFilterList.addFilter(new TimelinePrefixFilter(
        TimelineCompareOp.EQUAL, "NOTHING"));
    dataToRetrieve.setConfsToRetrieve(timelineFilterList);

    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    TimelineEntity timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertEquals(0, timelineEntity.getConfigs().size());

    timelineFilterList.addFilter(new TimelinePrefixFilter(
        TimelineCompareOp.EQUAL, "YARN_AM_NODE_LABEL_EXPRESSION"));
    dataToRetrieve.setConfsToRetrieve(timelineFilterList);

    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertTrue(timelineEntity.getConfigs().size() > 0);

    //testing metrics prefix for AND condition
    timelineFilterList.setOperator(TimelineFilterList.Operator.AND);
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertEquals(0, timelineEntity.getConfigs().size());

    dataToRetrieve.getConfsToRetrieve().getFilterList().remove(0);
    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
    timelineEntity = timelineReader.getEntity(context,
        dataToRetrieve);
    assertTrue(timelineEntity.getConfigs().size() > 0);
  }
}