TestMetricsSourceAdapter.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.metrics2.impl;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import static org.apache.hadoop.metrics2.lib.Interns.info;

import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;

import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;

public class TestMetricsSourceAdapter {
  private static final int RACE_TEST_RUNTIME = 10000; // 10 seconds

  @Test
  public void testPurgeOldMetrics() throws Exception {
    // create test source with a single metric counter of value 1
    PurgableSource source = new PurgableSource();
    MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(source);
    final MetricsSource s = sb.build();

    List<MetricsTag> injectedTags = new ArrayList<MetricsTag>();
    MetricsSourceAdapter sa = new MetricsSourceAdapter(
        "tst", "tst", "testdesc", s, injectedTags, null, null, 1, false);

    MBeanInfo info = sa.getMBeanInfo();
    boolean sawIt = false;
    for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
      sawIt |= mBeanAttributeInfo.getName().equals(source.lastKeyName);
    };
    assertTrue(sawIt, "The last generated metric is not exported to jmx");

    Thread.sleep(1000); // skip JMX cache TTL

    info = sa.getMBeanInfo();
    sawIt = false;
    for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
      sawIt |= mBeanAttributeInfo.getName().equals(source.lastKeyName);
    };
    assertTrue(sawIt, "The last generated metric is not exported to jmx");
  }

  //generate a new key per each call
  private static class PurgableSource implements MetricsSource {
    int nextKey = 0;
    String lastKeyName = null;
    @Override
    public void getMetrics(MetricsCollector collector, boolean all) {
      MetricsRecordBuilder rb =
          collector.addRecord("purgablesource")
              .setContext("test");
      lastKeyName = "key" + nextKey++;
      rb.addGauge(info(lastKeyName, "desc"), 1);
    }
  }

  @Test
  public void testGetMetricsAndJmx() throws Exception {
    // create test source with a single metric counter of value 0
    TestSource source = new TestSource("test");
    MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(source);
    final MetricsSource s = sb.build();

    List<MetricsTag> injectedTags = new ArrayList<MetricsTag>();
    MetricsSourceAdapter sa = new MetricsSourceAdapter(
        "test", "test", "test desc", s, injectedTags, null, null, 1, false);

    // all metrics are initially assumed to have changed
    MetricsCollectorImpl builder = new MetricsCollectorImpl();
    Iterable<MetricsRecordImpl> metricsRecords = sa.getMetrics(builder, true);

    // Validate getMetrics and JMX initial values
    MetricsRecordImpl metricsRecord = metricsRecords.iterator().next();
    assertEquals(0L,
        metricsRecord.metrics().iterator().next().value().longValue());

    Thread.sleep(100); // skip JMX cache TTL
    assertEquals(0L, (Number)sa.getAttribute("C1"));

    // change metric value
    source.incrementCnt();

    // validate getMetrics and JMX
    builder = new MetricsCollectorImpl();
    metricsRecords = sa.getMetrics(builder, true);
    metricsRecord = metricsRecords.iterator().next();
    assertTrue(metricsRecord.metrics().iterator().hasNext());
    Thread.sleep(100); // skip JMX cache TTL
    assertEquals(1L, (Number)sa.getAttribute("C1"));
  }

  @SuppressWarnings("unused")
  @Metrics(context="test")
  private static class TestSource {
    @Metric("C1 desc") MutableCounterLong c1;
    final MetricsRegistry registry;

    TestSource(String recName) {
      registry = new MetricsRegistry(recName);
    }

    public void incrementCnt() {
      c1.incr();
    }
  }

  /**
   * Test a race condition when updating the JMX cache (HADOOP-12482):
   * 1. Thread A reads the JMX metric every 2 JMX cache TTL. It marks the JMX
   *    cache to be updated by marking lastRecs to null. After this it adds a
   *    new key to the metrics. The next call to read should pick up this new
   *    key.
   * 2. Thread B triggers JMX metric update every 1 JMX cache TTL. It assigns
   *    lastRecs to a new object (not null any more).
   * 3. Thread A tries to read JMX metric again, sees lastRecs is not null and
   *    does not update JMX cache. As a result the read does not pickup the new
   *    metric.
   * @throws Exception
   */
  @Test
  public void testMetricCacheUpdateRace() throws Exception {
    // Create test source with a single metric counter of value 1.
    TestMetricsSource source = new TestMetricsSource();
    MetricsSourceBuilder sourceBuilder =
        MetricsAnnotations.newSourceBuilder(source);

    final long JMX_CACHE_TTL = 250; // ms
    List<MetricsTag> injectedTags = new ArrayList<>();
    MetricsSourceAdapter sourceAdapter =
        new MetricsSourceAdapter("test", "test",
            "test JMX cache update race condition", sourceBuilder.build(),
            injectedTags, null, null, JMX_CACHE_TTL, false);

    ScheduledExecutorService updaterExecutor =
        Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
    ScheduledExecutorService readerExecutor =
        Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());

    final AtomicBoolean hasError = new AtomicBoolean(false);

    // Wake up every 1 JMX cache TTL to set lastRecs before updateJmxCache() is
    // called.
    SourceUpdater srcUpdater = new SourceUpdater(sourceAdapter, hasError);
    ScheduledFuture<?> updaterFuture =
        updaterExecutor.scheduleAtFixedRate(srcUpdater,
            sourceAdapter.getJmxCacheTTL(), sourceAdapter.getJmxCacheTTL(),
            TimeUnit.MILLISECONDS);
    srcUpdater.setFuture(updaterFuture);

    // Wake up every 2 JMX cache TTL so updateJmxCache() will try to update
    // JMX cache.
    SourceReader srcReader = new SourceReader(source, sourceAdapter, hasError);
    ScheduledFuture<?> readerFuture =
        readerExecutor.scheduleAtFixedRate(srcReader,
            0, // set JMX info cache at the beginning
            2 * sourceAdapter.getJmxCacheTTL(), TimeUnit.MILLISECONDS);
    srcReader.setFuture(readerFuture);

    // Let the threads do their work.
    Thread.sleep(RACE_TEST_RUNTIME);

    assertFalse(hasError.get(), "Hit error");

    // cleanup
    updaterExecutor.shutdownNow();
    readerExecutor.shutdownNow();
    updaterExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
    readerExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
  }

  /**
   * Thread safe source: stores a key value pair. Allows thread safe key-value
   * pair reads/writes.
   */
  private static class TestMetricsSource implements MetricsSource {
    private String key = "key0";
    private int val = 0;

    synchronized String getKey() {
      return key;
    }

    synchronized void setKV(final String newKey, final int newVal) {
      key = newKey;
      val = newVal;
    }

    @Override
    public void getMetrics(MetricsCollector collector, boolean all) {
      MetricsRecordBuilder rb =
          collector.addRecord("TestMetricsSource").setContext("test");
      synchronized(this) {
        rb.addGauge(info(key, "TestMetricsSource key"), val);
      }
    }
  }

  /**
   * An thread that updates the metrics source every 1 JMX cache TTL
   */
  private static class SourceUpdater implements Runnable {
    private MetricsSourceAdapter sa = null;
    private ScheduledFuture<?> future = null;
    private AtomicBoolean hasError = null;
    private static final Logger LOG = Logger.getLogger(SourceUpdater.class);

    public SourceUpdater(MetricsSourceAdapter sourceAdapter,
        AtomicBoolean err) {
      sa = sourceAdapter;
      hasError = err;
    }

    public void setFuture(ScheduledFuture<?> f) {
      future = f;
    }

    @Override
    public void run() {
      MetricsCollectorImpl builder = new MetricsCollectorImpl();
      try {
        // This resets lastRecs.
        sa.getMetrics(builder, true);
        LOG.info("reset lastRecs");
      } catch (Exception e) {
        // catch all errors
        hasError.set(true);
        LOG.error(e.getStackTrace());
      } finally {
        if (hasError.get()) {
          LOG.error("Hit error, stopping now");
          future.cancel(false);
        }
      }
    }
  }

  /**
   * An thread that reads the metrics source every JMX cache TTL. After each
   * read it updates the metric source to report a new key. The next read must
   * be able to pick up this new key.
   */
  private static class SourceReader implements Runnable {
    private MetricsSourceAdapter sa = null;
    private TestMetricsSource src = null;
    private int cnt = 0;
    private ScheduledFuture<?> future = null;
    private AtomicBoolean hasError = null;
    private static final Logger LOG = Logger.getLogger(SourceReader.class);

    public SourceReader(
        TestMetricsSource source, MetricsSourceAdapter sourceAdapter,
        AtomicBoolean err) {
      src = source;
      sa = sourceAdapter;
      hasError = err;
    }

    public void setFuture(ScheduledFuture<?> f) {
      future = f;
    }

    @Override
    public void run() {
      try {
        // This will trigger updateJmxCache().
        MBeanInfo info = sa.getMBeanInfo();
        final String key = src.getKey();
        for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
          // Found the new key, update the metric source and move on.
          if (mBeanAttributeInfo.getName().equals(key)) {
            LOG.info("found key/val=" + cnt + "/" + cnt);
            cnt++;
            src.setKV("key" + cnt, cnt);
            return;
          }
        }
        LOG.error("key=" + key + " not found. Stopping now.");
        hasError.set(true);
      } catch (Exception e) {
        // catch other errors
        hasError.set(true);
        LOG.error(e.getStackTrace());
      } finally {
        if (hasError.get()) {
          future.cancel(false);
        }
      }
    }
  }
}