TestTimelineCollector.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.collector;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.junit.jupiter.api.Test;
import org.mockito.internal.stubbing.answers.AnswersWithDelay;
import org.mockito.internal.stubbing.answers.Returns;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestTimelineCollector {

  private TimelineEntities generateTestEntities(int groups, int entities) {
    TimelineEntities te = new TimelineEntities();
    for (int j = 0; j < groups; j++) {
      for (int i = 0; i < entities; i++) {
        TimelineEntity entity = new TimelineEntity();
        String containerId = "container_1000178881110_2002_" + i;
        entity.setId(containerId);
        String entityType = "TEST_" + j;
        entity.setType(entityType);
        long cTime = 1425016501000L;
        entity.setCreatedTime(cTime);

        // add metrics
        Set<TimelineMetric> metrics = new HashSet<>();
        TimelineMetric m1 = new TimelineMetric();
        m1.setId("HDFS_BYTES_WRITE");
        m1.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
        long ts = System.currentTimeMillis();
        m1.addValue(ts - 20000, 100L);
        metrics.add(m1);

        TimelineMetric m2 = new TimelineMetric();
        m2.setId("VCORES_USED");
        m2.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
        m2.addValue(ts - 20000, 3L);
        metrics.add(m2);

        // m3 should not show up in the aggregation
        TimelineMetric m3 = new TimelineMetric();
        m3.setId("UNRELATED_VALUES");
        m3.addValue(ts - 20000, 3L);
        metrics.add(m3);

        TimelineMetric m4 = new TimelineMetric();
        m4.setId("TXN_FINISH_TIME");
        m4.setRealtimeAggregationOp(TimelineMetricOperation.MAX);
        m4.addValue(ts - 20000, i);
        metrics.add(m4);

        entity.addMetrics(metrics);
        te.addEntity(entity);
      }
    }

    return te;
  }

  @Test
  void testAggregation() throws Exception {
    // Test aggregation with multiple groups.
    int groups = 3;
    int n = 50;
    TimelineEntities testEntities = generateTestEntities(groups, n);
    TimelineEntity resultEntity = TimelineCollector.aggregateEntities(
        testEntities, "test_result", "TEST_AGGR", true);
    assertThat(resultEntity.getMetrics()).hasSize(groups * 3);

    for (int i = 0; i < groups; i++) {
      Set<TimelineMetric> metrics = resultEntity.getMetrics();
      for (TimelineMetric m : metrics) {
        if (m.getId().startsWith("HDFS_BYTES_WRITE")) {
          assertEquals(100 * n, m.getSingleDataValue().intValue());
        } else if (m.getId().startsWith("VCORES_USED")) {
          assertEquals(3 * n, m.getSingleDataValue().intValue());
        } else if (m.getId().startsWith("TXN_FINISH_TIME")) {
          assertEquals(n - 1, m.getSingleDataValue());
        } else {
          fail("Unrecognized metric! " + m.getId());
        }
      }
    }

    // Test aggregation with a single group.
    TimelineEntities testEntities1 = generateTestEntities(1, n);
    TimelineEntity resultEntity1 = TimelineCollector.aggregateEntities(
        testEntities1, "test_result", "TEST_AGGR", false);
    assertThat(resultEntity1.getMetrics()).hasSize(3);

    Set<TimelineMetric> metrics = resultEntity1.getMetrics();
    for (TimelineMetric m : metrics) {
      if (m.getId().equals("HDFS_BYTES_WRITE")) {
        assertEquals(100 * n, m.getSingleDataValue().intValue());
      } else if (m.getId().equals("VCORES_USED")) {
        assertEquals(3 * n, m.getSingleDataValue().intValue());
      } else if (m.getId().equals("TXN_FINISH_TIME")) {
        assertEquals(n - 1, m.getSingleDataValue());
      } else {
        fail("Unrecognized metric! " + m.getId());
      }
    }

  }

  /**
   * Test TimelineCollector's interaction with TimelineWriter upon
   * putEntity() calls.
   */
  @Test
  void testPutEntity() throws IOException {
    TimelineWriter writer = mock(TimelineWriter.class);
    TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth.
        TimelineHealthStatus.RUNNING, "");
    when(writer.getHealthStatus()).thenReturn(timelineHealth);

    Configuration conf = new Configuration();
    conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5);
    conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
        500L);

    TimelineCollector collector = new TimelineCollectorForTest(writer);
    collector.init(conf);

    TimelineEntities entities = generateTestEntities(1, 1);
    collector.putEntities(
        entities, UserGroupInformation.createRemoteUser("test-user"));

    verify(writer, times(1)).write(any(TimelineCollectorContext.class),
        any(TimelineEntities.class), any(UserGroupInformation.class));
    verify(writer, times(1)).flush();
  }


  @Test
  void testPutEntityWithStorageDown() throws IOException {
    TimelineWriter writer = mock(TimelineWriter.class);
    TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth.
        TimelineHealthStatus.CONNECTION_FAILURE, "");
    when(writer.getHealthStatus()).thenReturn(timelineHealth);

    Configuration conf = new Configuration();
    conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5);
    conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
        500L);

    TimelineCollector collector = new TimelineCollectorForTest(writer);
    collector.init(conf);

    TimelineEntities entities = generateTestEntities(1, 1);
    boolean exceptionCaught = false;
    try {
      collector.putEntities(entities, UserGroupInformation.
          createRemoteUser("test-user"));
    } catch (Exception e) {
      if (e.getMessage().contains("Failed to putEntities")) {
        exceptionCaught = true;
      }
    }
    assertTrue(exceptionCaught, "TimelineCollector putEntity failed to " +
        "handle storage down");
  }

  /**
   * Test TimelineCollector's interaction with TimelineWriter upon
   * putEntityAsync() calls.
   */
  @Test
  void testPutEntityAsync() throws Exception {
    TimelineWriter writer = mock(TimelineWriter.class);
    TimelineCollector collector = new TimelineCollectorForTest(writer);
    collector.init(new Configuration());
    collector.start();
    TimelineEntities entities = generateTestEntities(1, 1);
    collector.putEntitiesAsync(
        entities, UserGroupInformation.createRemoteUser("test-user"));
    Thread.sleep(1000);
    verify(writer, times(1)).write(any(TimelineCollectorContext.class),
        any(TimelineEntities.class), any(UserGroupInformation.class));
    verify(writer, never()).flush();
    collector.stop();
  }

  /**
   * Test TimelineCollector's discarding entities in case of async writes if
   * write is taking too much time.
   */
  @Test
  void testAsyncEntityDiscard() throws Exception {
    TimelineWriter writer = mock(TimelineWriter.class);

    when(writer.write(any(), any(), any())).thenAnswer(
        new AnswersWithDelay(500, new Returns(new TimelineWriteResponse())));
    TimelineCollector collector = new TimelineCollectorForTest(writer);
    Configuration config = new Configuration();
    config
        .setInt(YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
            3);
    collector.init(config);
    collector.start();
    for (int i = 0; i < 10; ++i) {
      TimelineEntities entities = generateTestEntities(i + 1, 1);
      collector.putEntitiesAsync(entities,
          UserGroupInformation.createRemoteUser("test-user"));
    }
    Thread.sleep(3000);
    verify(writer, times(4))
        .write(any(TimelineCollectorContext.class), any(TimelineEntities.class),
            any(UserGroupInformation.class));
    verify(writer, never()).flush();
    collector.stop();
  }

  /**
   * Test TimelineCollector's interaction with TimelineWriter upon
   * putDomain() calls.
   */
  @Test
  void testPutDomain() throws IOException {
    TimelineWriter writer = mock(TimelineWriter.class);
    TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth.
        TimelineHealthStatus.RUNNING, "");
    when(writer.getHealthStatus()).thenReturn(timelineHealth);

    Configuration conf = new Configuration();
    conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5);
    conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
        500L);

    TimelineCollector collector = new TimelineCollectorForTest(writer);
    collector.init(conf);

    TimelineDomain domain =
        generateDomain("id", "desc", "owner", "reader1,reader2", "writer", 0L,
            1L);
    collector.putDomain(domain, UserGroupInformation.createRemoteUser("owner"));

    verify(writer, times(1))
        .write(any(TimelineCollectorContext.class), any(TimelineDomain.class));
    verify(writer, times(1)).flush();
  }

  private static TimelineDomain generateDomain(String id, String desc,
      String owner, String reader, String writer, Long cTime, Long mTime) {
    TimelineDomain domain = new TimelineDomain();
    domain.setId(id);
    domain.setDescription(desc);
    domain.setOwner(owner);
    domain.setReaders(reader);
    domain.setWriters(writer);
    domain.setCreatedTime(cTime);
    domain.setModifiedTime(mTime);
    return domain;
  }

  private static class TimelineCollectorForTest extends TimelineCollector {
    private final TimelineCollectorContext context =
        new TimelineCollectorContext();

    TimelineCollectorForTest(TimelineWriter writer) {
      super("TimelineCollectorForTest");
      setWriter(writer);
    }

    @Override
    public TimelineCollectorContext getTimelineEntityContext() {
      return context;
    }
  }

  private static TimelineEntity createEntity(String id, String type) {
    TimelineEntity entity = new TimelineEntity();
    entity.setId(id);
    entity.setType(type);
    return entity;
  }

  private static TimelineMetric createDummyMetric(long ts, Long value) {
    TimelineMetric metric = new TimelineMetric();
    metric.setId("dummy_metric");
    metric.addValue(ts, value);
    metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
    return metric;
  }

  @Test
  void testClearPreviousEntitiesOnAggregation() throws Exception {
    final long ts = System.currentTimeMillis();
    TimelineCollector collector = new TimelineCollector("") {
      @Override
      public TimelineCollectorContext getTimelineEntityContext() {
        return new TimelineCollectorContext("cluster", "user", "flow", "1",
            1L, ApplicationId.newInstance(ts, 1).toString());
      }
    };

    TimelineWriter writer = mock(TimelineWriter.class);
    TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth.
        TimelineHealthStatus.RUNNING, "");
    when(writer.getHealthStatus()).thenReturn(timelineHealth);

    Configuration conf = new Configuration();
    conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5);
    conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
        500L);

    collector.init(conf);
    collector.setWriter(writer);

    // Put 5 entities with different metric values.
    TimelineEntities entities = new TimelineEntities();
    for (int i = 1; i <= 5; i++) {
      TimelineEntity entity = createEntity("e" + i, "type");
      entity.addMetric(createDummyMetric(ts + i, Long.valueOf(i * 50)));
      entities.addEntity(entity);
    }
    collector.putEntities(entities, UserGroupInformation.getCurrentUser());

    TimelineCollectorContext currContext = collector.getTimelineEntityContext();
    // Aggregate the entities.
    Map<String, AggregationStatusTable> aggregationGroups
        = collector.getAggregationGroups();
    assertEquals(Sets.newHashSet("type"), aggregationGroups.keySet());
    TimelineEntity aggregatedEntity = TimelineCollector.
        aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
        TimelineEntityType.YARN_APPLICATION.toString());
    TimelineMetric aggregatedMetric =
        aggregatedEntity.getMetrics().iterator().next();
    assertEquals(750L, aggregatedMetric.getValues().values().iterator().next());
    assertEquals(TimelineMetricOperation.SUM,
        aggregatedMetric.getRealtimeAggregationOp());

    // Aggregate entities.
    aggregatedEntity = TimelineCollector.
        aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
        TimelineEntityType.YARN_APPLICATION.toString());
    aggregatedMetric = aggregatedEntity.getMetrics().iterator().next();
    // No values aggregated as no metrics put for an entity between this
    // aggregation and the previous one.
    assertTrue(aggregatedMetric.getValues().isEmpty());
    assertEquals(TimelineMetricOperation.NOP,
        aggregatedMetric.getRealtimeAggregationOp());

    // Put 3 entities.
    entities = new TimelineEntities();
    for (int i = 1; i <= 3; i++) {
      TimelineEntity entity = createEntity("e" + i, "type");
      entity.addMetric(createDummyMetric(System.currentTimeMillis() + i, 50L));
      entities.addEntity(entity);
    }
    aggregationGroups = collector.getAggregationGroups();
    collector.putEntities(entities, UserGroupInformation.getCurrentUser());

    // Aggregate entities.
    aggregatedEntity = TimelineCollector.
        aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
        TimelineEntityType.YARN_APPLICATION.toString());
    // Last 3 entities picked up for aggregation.
    aggregatedMetric = aggregatedEntity.getMetrics().iterator().next();
    assertEquals(150L, aggregatedMetric.getValues().values().iterator().next());
    assertEquals(TimelineMetricOperation.SUM,
        aggregatedMetric.getRealtimeAggregationOp());

    collector.close();
  }
}