TestDocumentOperations.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection;

import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
import org.junit.jupiter.api.Test;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

/**
 * Timeline Entity Document merge and aggregation test.
 */
public class TestDocumentOperations {

  private static final String MEMORY_ID = "MEMORY";
  private static final String FLOW_NAME = "DistributedShell";
  private static final String FLOW_VERSION = "1";

  @Test
  public void testTimelineEntityDocMergeOperation() throws IOException {
    TimelineEntityDocument actualEntityDoc =
        new TimelineEntityDocument();
    TimelineEntityDocument expectedEntityDoc =
        DocumentStoreTestUtils.bakeTimelineEntityDoc();

    assertEquals(1, actualEntityDoc.getInfo().size());
    assertEquals(0, actualEntityDoc.getMetrics().size());
    assertEquals(0, actualEntityDoc.getEvents().size());
    assertEquals(0, actualEntityDoc.getConfigs().size());
    assertEquals(0, actualEntityDoc.getIsRelatedToEntities().size());
    assertEquals(0, actualEntityDoc.getRelatesToEntities().size());

    actualEntityDoc.merge(expectedEntityDoc);

    assertEquals(expectedEntityDoc.getInfo().size(),
        actualEntityDoc.getInfo().size());
    assertEquals(expectedEntityDoc.getMetrics().size(),
        actualEntityDoc.getMetrics().size());
    assertEquals(expectedEntityDoc.getEvents().size(),
        actualEntityDoc.getEvents().size());
    assertEquals(expectedEntityDoc.getConfigs().size(),
        actualEntityDoc.getConfigs().size());
    assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
        actualEntityDoc.getIsRelatedToEntities().size());
    assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
        actualEntityDoc.getRelatesToEntities().size());
  }

  @Test
  public void testFlowActivityDocMergeOperation() throws IOException {
    FlowActivityDocument actualFlowActivityDoc = new FlowActivityDocument();
    FlowActivityDocument expectedFlowActivityDoc =
        DocumentStoreTestUtils.bakeFlowActivityDoc();

    assertEquals(0, actualFlowActivityDoc.getDayTimestamp());
    assertEquals(0, actualFlowActivityDoc.getFlowActivities().size());
    assertNull(actualFlowActivityDoc.getFlowName());
    assertEquals(TimelineEntityType.YARN_FLOW_ACTIVITY.toString(),
        actualFlowActivityDoc.getType());
    assertNull(actualFlowActivityDoc.getUser());
    assertNull(actualFlowActivityDoc.getId());

    actualFlowActivityDoc.merge(expectedFlowActivityDoc);

    assertEquals(expectedFlowActivityDoc.getDayTimestamp(),
        actualFlowActivityDoc.getDayTimestamp());
    assertEquals(expectedFlowActivityDoc.getFlowActivities().size(),
        actualFlowActivityDoc.getFlowActivities().size());
    assertEquals(expectedFlowActivityDoc.getFlowName(),
        actualFlowActivityDoc.getFlowName());
    assertEquals(expectedFlowActivityDoc.getType(),
        actualFlowActivityDoc.getType());
    assertEquals(expectedFlowActivityDoc.getUser(),
        actualFlowActivityDoc.getUser());
    assertEquals(expectedFlowActivityDoc.getId(),
        actualFlowActivityDoc.getId());

    expectedFlowActivityDoc.addFlowActivity(FLOW_NAME,
        FLOW_VERSION, System.currentTimeMillis());

    actualFlowActivityDoc.merge(expectedFlowActivityDoc);

    assertEquals(expectedFlowActivityDoc.getDayTimestamp(),
        actualFlowActivityDoc.getDayTimestamp());
    assertEquals(expectedFlowActivityDoc.getFlowActivities().size(),
        actualFlowActivityDoc.getFlowActivities().size());
    assertEquals(expectedFlowActivityDoc.getFlowName(),
        actualFlowActivityDoc.getFlowName());
    assertEquals(expectedFlowActivityDoc.getType(),
        actualFlowActivityDoc.getType());
    assertEquals(expectedFlowActivityDoc.getUser(),
        actualFlowActivityDoc.getUser());
    assertEquals(expectedFlowActivityDoc.getId(),
        actualFlowActivityDoc.getId());
  }

  @Test
  public void testFlowRunDocMergeAndAggOperation() throws IOException {
    FlowRunDocument actualFlowRunDoc = new FlowRunDocument();
    FlowRunDocument expectedFlowRunDoc = DocumentStoreTestUtils
        .bakeFlowRunDoc();

    final long timestamp = System.currentTimeMillis();
    final long value = 98586624;
    TimelineMetric timelineMetric = new TimelineMetric();
    timelineMetric.setId(MEMORY_ID);
    timelineMetric.setType(TimelineMetric.Type.SINGLE_VALUE);
    timelineMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
    timelineMetric.addValue(timestamp, value);
    TimelineMetricSubDoc metricSubDoc = new TimelineMetricSubDoc(
        timelineMetric);
    expectedFlowRunDoc.getMetrics().put(MEMORY_ID, metricSubDoc);

    assertNull(actualFlowRunDoc.getClusterId());
    assertNull(actualFlowRunDoc.getFlowName());
    assertNull(actualFlowRunDoc.getFlowRunId());
    assertNull(actualFlowRunDoc.getFlowVersion());
    assertNull(actualFlowRunDoc.getId());
    assertNull(actualFlowRunDoc.getUsername());
    assertEquals(actualFlowRunDoc.getType(), TimelineEntityType.
        YARN_FLOW_RUN.toString());
    assertEquals(0, actualFlowRunDoc.getMinStartTime());
    assertEquals(0, actualFlowRunDoc.getMaxEndTime());
    assertEquals(0, actualFlowRunDoc.getMetrics().size());

    actualFlowRunDoc.merge(expectedFlowRunDoc);

    assertEquals(expectedFlowRunDoc.getClusterId(),
        actualFlowRunDoc.getClusterId());
    assertEquals(expectedFlowRunDoc.getFlowName(),
        actualFlowRunDoc.getFlowName());
    assertEquals(expectedFlowRunDoc.getFlowRunId(),
        actualFlowRunDoc.getFlowRunId());
    assertEquals(expectedFlowRunDoc.getFlowVersion(),
        actualFlowRunDoc.getFlowVersion());
    assertEquals(expectedFlowRunDoc.getId(), actualFlowRunDoc.getId());
    assertEquals(expectedFlowRunDoc.getUsername(),
        actualFlowRunDoc.getUsername());
    assertEquals(expectedFlowRunDoc.getType(),
        actualFlowRunDoc.getType());
    assertEquals(expectedFlowRunDoc.getMinStartTime(),
        actualFlowRunDoc.getMinStartTime());
    assertEquals(expectedFlowRunDoc.getMaxEndTime(),
        actualFlowRunDoc.getMaxEndTime());
    assertEquals(expectedFlowRunDoc.getMetrics().size(),
        actualFlowRunDoc.getMetrics().size());

    actualFlowRunDoc.merge(expectedFlowRunDoc);

    assertEquals(value + value, actualFlowRunDoc.getMetrics()
        .get(MEMORY_ID).getSingleDataValue());
  }
}