TestDurationTracking.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.statistics;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
import org.apache.hadoop.util.functional.FutureIO;

import static org.apache.hadoop.fs.statistics.DurationStatisticSummary.fetchDurationSummary;
import static org.apache.hadoop.fs.statistics.DurationStatisticSummary.fetchSuccessSummary;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.*;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.*;
import static org.apache.hadoop.fs.statistics.impl.StubDurationTrackerFactory.STUB_DURATION_TRACKER_FACTORY;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;

/**
 * Test the IOStatistic DurationTracker logic.
 */
public class TestDurationTracking extends AbstractHadoopTestBase {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestDurationTracking.class);

  private static final String REQUESTS = "requests";

  public static final String UNKNOWN = "unknown";

  private IOStatisticsStore stats;

  private final AtomicInteger invocationCounter = new AtomicInteger(0);

  @BeforeEach
  public void setup() {
    stats = iostatisticsStore()
        .withDurationTracking(REQUESTS)
        .build();
  }

  @AfterEach
  public void teardown() {
    LOG.info("stats {}", stats);
  }

  /**
   * Duration tracking.
   */
  @Test
  public void testDurationTryWithResources() throws Throwable {
    DurationTracker tracker =
        stats.trackDuration(REQUESTS);
    verifyStatisticCounterValue(stats, REQUESTS, 1L);
    sleep();
    tracker.close();
    try (DurationTracker ignored =
             stats.trackDuration(REQUESTS)) {
      sleep();
    }
    LOG.info("Statistics: {}", stats);
    DurationStatisticSummary summary = fetchSuccessSummary(stats, REQUESTS);
    assertSummaryValues(summary, 2, 1, 1);
    assertSummaryMean(summary, 2, 0);
  }

  /**
   * A little sleep method; exceptions are swallowed.
   * Increments {@link #invocationCounter}.
   * Increments {@inheritDoc #atomicCounter}.
   */
  public void sleep() {
    sleepf(10);
  }

  /**
   * A little sleep function; exceptions are swallowed.
   * Increments {@link #invocationCounter}.
   */
  protected int sleepf(final int millis) {
    invocationCounter.incrementAndGet();
    try {
      Thread.sleep(millis);
    } catch (InterruptedException ignored) {
    }
    return millis;
  }

  /**
   * Assert that the sleep counter has been invoked
   * the expected number of times.
   * @param expected expected value
   */
  private void assertCounterValue(final int expected) {
    assertThat(invocationCounter.get())
        .describedAs("Sleep invocation Counter")
        .isEqualTo(expected);
  }

  /**
   * Test that a function raising an IOE can be wrapped.
   */
  @Test
  public void testDurationFunctionIOE() throws Throwable {
    FunctionRaisingIOE<Integer, Integer> fn =
        trackFunctionDuration(stats, REQUESTS,
            (Integer x) -> invocationCounter.getAndSet(x));
    assertThat(fn.apply(1)).isEqualTo(0);
    assertCounterValue(1);
    assertSummaryValues(
        fetchSuccessSummary(stats, REQUESTS),
        1, 0, 0);
  }

  /**
   * Trigger a failure and verify its the failure statistics
   * which go up.
   */
  @Test
  public void testDurationFunctionIOEFailure() throws Throwable {
    FunctionRaisingIOE<Integer, Integer> fn =
        trackFunctionDuration(stats, REQUESTS,
            (Integer x) -> {
              sleep();
              return 100 / x;
            });
    intercept(ArithmeticException.class,
        () -> fn.apply(0));
    assertSummaryValues(
        fetchSuccessSummary(stats, REQUESTS),
        1, -1, -1);

    DurationStatisticSummary failures = fetchDurationSummary(stats, REQUESTS,
        false);
    assertSummaryValues(failures, 1, 0, 0);
    assertSummaryMean(failures, 1, 0);
  }

  /**
   * Trigger a failure and verify its the failure statistics
   * which go up.
   */
  @Test
  public void testDurationJavaFunctionFailure() throws Throwable {
    Function<Integer, Integer> fn =
        trackJavaFunctionDuration(stats, REQUESTS,
            (Integer x) -> {
              return 100 / x;
            });
    intercept(ArithmeticException.class,
        () -> fn.apply(0));
    assertSummaryValues(
        fetchSuccessSummary(stats, REQUESTS),
        1, -1, -1);

    DurationStatisticSummary failures = fetchDurationSummary(stats, REQUESTS,
        false);
    assertSummaryValues(failures, 1, 0, 0);
  }

  /**
   * Test trackDurationOfCallable.
   */
  @Test
  public void testCallableDuration() throws Throwable {
    // call the operation
    assertThat(
        trackDurationOfCallable(stats, REQUESTS, () -> sleepf(100)).call())
        .isEqualTo(100);
    DurationStatisticSummary summary = fetchSuccessSummary(stats, REQUESTS);
    assertSummaryValues(summary, 1, 0, 0);
    assertSummaryMean(summary, 1, 0);
  }

  /**
   * Callable raising an RTE after a sleep; failure
   * stats will be updated and the execution count will be
   * 1.
   */
  @Test
  public void testCallableFailureDuration() throws Throwable {

    intercept(RuntimeException.class,
        trackDurationOfCallable(stats, REQUESTS, () -> {
          sleepf(100);
          throw new RuntimeException("oops");
        }));
    assertCounterValue(1);
    assertSummaryValues(
        fetchSuccessSummary(stats, REQUESTS),
        1, -1, -1);

    assertSummaryValues(fetchDurationSummary(stats, REQUESTS, false),
        1, 0, 0);
  }

  /**
   * Duration of the successful execution of a InvocationRaisingIOE.
   */
  @Test
  public void testInvocationDuration() throws Throwable {
    // call the operation
    trackDurationOfInvocation(stats, REQUESTS, () -> {
      sleepf(100);
    });
    assertCounterValue(1);
    DurationStatisticSummary summary = fetchSuccessSummary(stats, REQUESTS);
    assertSummaryValues(summary, 1, 0, 0);
    assertSummaryMean(summary, 1, 0);
  }

  /**
   * Duration of the successful execution of a CallableRaisingIOE.
   */
  @Test
  public void testCallableIOEDuration() throws Throwable {
    // call the operation
    assertThat(
        trackDuration(stats, REQUESTS, () -> sleepf(100)))
        .isEqualTo(100);
    DurationStatisticSummary summary = fetchSuccessSummary(stats, REQUESTS);
    assertSummaryValues(summary, 1, 0, 0);
    assertSummaryMean(summary, 1, 0);
  }

  /**
   * Track the duration of an IOE raising callable which fails.
   */
  @Test
  public void testCallableIOEFailureDuration() throws Throwable {
    intercept(IOException.class,
        () ->
        trackDuration(stats, REQUESTS, () -> {
          sleepf(100);
          throw new IOException("oops");
        }));
    assertSummaryValues(
        fetchSuccessSummary(stats, REQUESTS),
        1, -1, -1);

    assertSummaryValues(fetchDurationSummary(stats, REQUESTS, false),
        1, 0, 0);
  }


  /**
   * Track the duration of an IOE raising callable which fails.
   */
  @Test
  public void testDurationThroughEval() throws Throwable {
    CompletableFuture<Object> eval = FutureIO.eval(
        trackDurationOfOperation(stats, REQUESTS, () -> {
          sleepf(100);
          throw new FileNotFoundException("oops");
        }));
    intercept(FileNotFoundException.class, "oops", () ->
        FutureIO.awaitFuture(eval));
    assertSummaryValues(fetchDurationSummary(stats, REQUESTS, false),
        1, 0, 0);
  }

  /**
   * It's OK to track a duration against an unknown statistic.
   */
  @Test
  public void testUnknownDuration() throws Throwable {
    trackDurationOfCallable(stats, UNKNOWN, () -> sleepf(1)).call();
    DurationStatisticSummary summary = fetchSuccessSummary(stats, UNKNOWN);
    assertSummaryValues(summary, 0, -1, -1);
    assertThat(summary.getMean()).isNull();
  }

  /**
   * The stub duration tracker factory can be supplied as an input.
   */
  @Test
  public void testTrackDurationWithStubFactory() throws Throwable {
    trackDuration(STUB_DURATION_TRACKER_FACTORY, UNKNOWN, () -> sleepf(1));
  }

  /**
   * Make sure the tracker returned from the stub factory
   * follows the basic lifecycle.
   */
  @Test
  public void testStubDurationLifecycle() throws Throwable {
    DurationTracker tracker = STUB_DURATION_TRACKER_FACTORY
        .trackDuration("k", 1);
    tracker.failed();
    tracker.close();
    tracker.close();
  }

  /**
   * Assert that a statistics summary has the specific values.
   * @param summary summary data
   * @param count count -must match exactly.
   * @param minBase minimum value for the minimum field (inclusive)
   * @param maxBase minimum value for the maximum field (inclusive)
   */
  protected void assertSummaryValues(
      final DurationStatisticSummary summary,
      final int count,
      final int minBase,
      final int maxBase) {
    assertThat(summary)
        .matches(s -> s.getCount() == count, "Count value")
        .matches(s -> s.getMax() >= maxBase, "Max value")
        .matches(s -> s.getMin() >= minBase, "Min value");
  }

  /**
   * Assert that at a summary has a matching mean value.
   * @param summary summary data.
   * @param expectedSampleCount sample count -which must match
   * @param meanGreaterThan the mean must be greater than this value.
   */
  protected void assertSummaryMean(
      final DurationStatisticSummary summary,
      final int expectedSampleCount,
      final double meanGreaterThan) {
    String description = "mean of " + summary;
    assertThat(summary.getMean())
        .describedAs(description)
        .isNotNull();
    assertThat(summary.getMean().getSamples())
        .describedAs(description)
        .isEqualTo(expectedSampleCount);
    assertThat(summary.getMean().mean())
        .describedAs(description)
        .isGreaterThan(meanGreaterThan);
  }
}