TestFSNamesystemLock.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.namenode;

import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.Time;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import static org.junit.Assert.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FSLOCK_FAIR_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;

/**
 * Tests the FSNamesystemLock, looking at lock compatibilities and
 * proper logging of lock hold times.
 */
public class TestFSNamesystemLock {

  @Test
  public void testFsLockFairness() throws IOException, InterruptedException{
    Configuration conf = new Configuration();

    conf.setBoolean(DFS_NAMENODE_FSLOCK_FAIR_KEY, true);
    FSNamesystemLock fsnLock = new FSNamesystemLock(conf, "FSN", null);
    assertTrue(fsnLock.coarseLock.isFair());

    conf.setBoolean(DFS_NAMENODE_FSLOCK_FAIR_KEY, false);
    fsnLock = new FSNamesystemLock(conf, "FSN", null);
    assertFalse(fsnLock.coarseLock.isFair());
  }

  @Test
  public void testFSNamesystemLockCompatibility() {
    FSNamesystemLock rwLock = new FSNamesystemLock(new Configuration(), "FSN", null);

    assertEquals(0, rwLock.getReadHoldCount());
    rwLock.readLock();
    assertEquals(1, rwLock.getReadHoldCount());

    rwLock.readLock();
    assertEquals(2, rwLock.getReadHoldCount());

    rwLock.readUnlock();
    assertEquals(1, rwLock.getReadHoldCount());

    rwLock.readUnlock();
    assertEquals(0, rwLock.getReadHoldCount());

    assertFalse(rwLock.isWriteLockedByCurrentThread());
    assertEquals(0, rwLock.getWriteHoldCount());
    rwLock.writeLock();
    assertTrue(rwLock.isWriteLockedByCurrentThread());
    assertEquals(1, rwLock.getWriteHoldCount());

    rwLock.writeLock();
    assertTrue(rwLock.isWriteLockedByCurrentThread());
    assertEquals(2, rwLock.getWriteHoldCount());

    rwLock.writeUnlock();
    assertTrue(rwLock.isWriteLockedByCurrentThread());
    assertEquals(1, rwLock.getWriteHoldCount());

    rwLock.writeUnlock();
    assertFalse(rwLock.isWriteLockedByCurrentThread());
    assertEquals(0, rwLock.getWriteHoldCount());
  }

  @Test
  public void testFSLockGetWaiterCount() throws InterruptedException {
    final int threadCount = 3;
    final CountDownLatch latch = new CountDownLatch(threadCount);
    final Configuration conf = new Configuration();
    conf.setBoolean(DFS_NAMENODE_FSLOCK_FAIR_KEY, true);
    final FSNamesystemLock rwLock = new FSNamesystemLock(conf, "FSN", null);
    rwLock.writeLock();
    ExecutorService helper = Executors.newFixedThreadPool(threadCount);

    for (int x = 0; x < threadCount; x++) {
      helper.execute(new Runnable() {
        @Override
        public void run() {
          latch.countDown();
          rwLock.readLock();
        }
      });
    }

    latch.await();
    try {
      GenericTestUtils.waitFor(new Supplier<Boolean>() {
        @Override
        public Boolean get() {
          return (threadCount == rwLock.getQueueLength());
        }
      }, 10, 1000);
    } catch (TimeoutException e) {
      fail("Expected number of blocked thread not found");
    }
  }

  /**
   * Test when FSNamesystem write lock is held for a long time,
   * logger will report it.
   */
  @Test(timeout=45000)
  public void testFSWriteLockLongHoldingReport() throws Exception {
    final long writeLockReportingThreshold = 100L;
    final long writeLockSuppressWarningInterval = 10000L;
    Configuration conf = new Configuration();
    conf.setLong(
        DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
        writeLockReportingThreshold);
    conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
        writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS);

    final FakeTimer timer = new FakeTimer();
    final FSNamesystemLock fsnLock = new FSNamesystemLock(conf, "FSN", null, timer);
    timer.advance(writeLockSuppressWarningInterval);

    LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
    GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);

    // Don't report if the write lock is held for a short time
    fsnLock.writeLock();
    fsnLock.writeUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));

    // Report if the write lock is held for a long time
    fsnLock.writeLock();
    timer.advance(writeLockReportingThreshold + 10);
    logs.clearOutput();
    fsnLock.writeUnlock();
    assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));

    // Track but do not report if the write lock is held (interruptibly) for
    // a long time but time since last report does not exceed the suppress
    // warning interval
    fsnLock.writeLockInterruptibly();
    timer.advance(writeLockReportingThreshold + 10);
    logs.clearOutput();
    fsnLock.writeUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));

    // Track but do not report if it's held for a long time when re-entering
    // write lock but time since last report does not exceed the suppress
    // warning interval
    fsnLock.writeLock();
    timer.advance(writeLockReportingThreshold / 2 + 1);
    fsnLock.writeLockInterruptibly();
    timer.advance(writeLockReportingThreshold / 2 + 1);
    fsnLock.writeLock();
    timer.advance(writeLockReportingThreshold / 2);
    logs.clearOutput();
    fsnLock.writeUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
    logs.clearOutput();
    fsnLock.writeUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
    logs.clearOutput();
    fsnLock.writeUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));

    // Report if it's held for a long time and time since last report exceeds
    // the supress warning interval
    timer.advance(writeLockSuppressWarningInterval);
    fsnLock.writeLock();
    timer.advance(writeLockReportingThreshold + 100);
    logs.clearOutput();
    fsnLock.writeUnlock();
    // look for the method name in the stack trace
    assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
    // find the held interval time in the log
    Pattern pattern = Pattern.compile(".*[\n].*\\d+ms(.*[\n].*){1,}");
    assertTrue(pattern.matcher(logs.getOutput()).find());
    // only keep the "yyyy-MM-dd" part of date
    String startTimeStr =
        "held at " + Time.formatTime(timer.now()).substring(0, 10);
    assertTrue(logs.getOutput().contains(startTimeStr));
    assertTrue(logs.getOutput().contains(
        "Number of suppressed write-lock reports of FSNLock is 2"));
  }

  /**
   * Test when FSNamesystem read lock is held for a long time,
   * logger will report it.
   */
  @Test(timeout=45000)
  public void testFSReadLockLongHoldingReport() throws Exception {
    final long readLockReportingThreshold = 100L;
    final long readLockSuppressWarningInterval = 10000L;
    final String readLockLogStmt = "Number of suppressed read-lock reports";
    Configuration conf = new Configuration();
    conf.setLong(
        DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
        readLockReportingThreshold);
    conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
        readLockSuppressWarningInterval, TimeUnit.MILLISECONDS);

    final FakeTimer timer = new FakeTimer();
    final FSNamesystemLock fsnLock = new FSNamesystemLock(conf, "FSN", null, timer);
    timer.advance(readLockSuppressWarningInterval);

    LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
    GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);

    // Don't report if the read lock is held for a short time
    fsnLock.readLock();
    fsnLock.readUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
        logs.getOutput().contains(readLockLogStmt));

    // Report the first read lock warning if it is held for a long time
    fsnLock.readLock();
    timer.advance(readLockReportingThreshold + 10);
    logs.clearOutput();
    fsnLock.readUnlock();
    assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
        logs.getOutput().contains(readLockLogStmt));

    // Track but do not Report if the write lock is held for a long time but
    // time since last report does not exceed the suppress warning interval
    fsnLock.readLock();
    timer.advance(readLockReportingThreshold + 10);
    logs.clearOutput();
    fsnLock.readUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
        logs.getOutput().contains(readLockLogStmt));

    // Track but do not Report if it's held for a long time when re-entering
    // read lock but time since last report does not exceed the suppress
    // warning interval
    Thread tLong = new Thread() {
      @Override
      public void run() {
        fsnLock.readLock();
        // Add one lock hold which is the longest, but occurs under a different
        // stack trace, to ensure this is the one that gets logged
        timer.advance(readLockReportingThreshold + 20);
        fsnLock.readUnlock();
      }
    };
    tLong.start();
    tLong.join();
    fsnLock.readLock();
    timer.advance(readLockReportingThreshold / 2 + 1);
    fsnLock.readLock();
    timer.advance(readLockReportingThreshold / 2 + 1);
    logs.clearOutput();
    fsnLock.readUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) ||
        logs.getOutput().contains(readLockLogStmt));
    logs.clearOutput();
    fsnLock.readUnlock();
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
        logs.getOutput().contains(readLockLogStmt));
    timer.advance(readLockSuppressWarningInterval);
    fsnLock.readLock();
    timer.advance(readLockReportingThreshold + 1);
    fsnLock.readUnlock();
    // Assert that stack trace eventually logged is the one for the longest hold
    String stackTracePatternString =
        String.format("INFO.+%s(.+\n){5}\\Q%%s\\E\\.run", readLockLogStmt);
    Pattern tLongPattern = Pattern.compile(
        String.format(stackTracePatternString, tLong.getClass().getName()));
    assertTrue(tLongPattern.matcher(logs.getOutput()).find());
    // only keep the "yyyy-MM-dd" part of date
    String startTimeStr =
        "held at " + Time.formatTime(timer.now()).substring(0, 10);
    assertTrue(logs.getOutput().contains(startTimeStr));
    assertTrue(logs.getOutput().contains(
        "Number of suppressed read-lock reports of FSNLock is 3"));

    // Report if it's held for a long time (and time since last report
    // exceeds the suppress warning interval) while another thread also has the
    // read lock. Let one thread hold the lock long enough to activate an
    // alert, then have another thread grab the read lock to ensure that this
    // doesn't reset the timing.
    timer.advance(readLockSuppressWarningInterval);
    logs.clearOutput();
    final CountDownLatch barrier = new CountDownLatch(1);
    final CountDownLatch barrier2 = new CountDownLatch(1);
    Thread t1 = new Thread() {
      @Override
      public void run() {
        try {
          fsnLock.readLock();
          timer.advance(readLockReportingThreshold + 1);
          barrier.countDown(); // Allow for t2 to acquire the read lock
          barrier2.await(); // Wait until t2 has the read lock
          fsnLock.readUnlock();
        } catch (InterruptedException e) {
          fail("Interrupted during testing");
        }
      }
    };
    Thread t2 = new Thread() {
      @Override
      public void run() {
        try {
          barrier.await(); // Wait until t1 finishes sleeping
          fsnLock.readLock();
          barrier2.countDown(); // Allow for t1 to unlock
          fsnLock.readUnlock();
        } catch (InterruptedException e) {
          fail("Interrupted during testing");
        }
      }
    };
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    // Look for the differentiating class names in the stack trace
    Pattern t1Pattern = Pattern.compile(
        String.format(stackTracePatternString, t1.getClass().getName()));
    assertTrue(t1Pattern.matcher(logs.getOutput()).find());
    Pattern t2Pattern = Pattern.compile(
        String.format(stackTracePatternString, t2.getClass().getName()));
    assertFalse(t2Pattern.matcher(logs.getOutput()).find());
    // match the held interval time in the log
    Pattern pattern = Pattern.compile(".*[\n].*\\d+ms(.*[\n].*){1,}");
    assertTrue(pattern.matcher(logs.getOutput()).find());
  }

  @Test
  public void testDetailedHoldMetrics() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, true);
    FakeTimer timer = new FakeTimer();
    MetricsRegistry registry = new MetricsRegistry("Test");
    MutableRatesWithAggregation rates =
        registry.newRatesWithAggregation("Test");
    FSNamesystemLock fsLock = new FSNamesystemLock(conf, "FSN", rates, timer);

    fsLock.readLock();
    timer.advanceNanos(1300000);
    fsLock.readUnlock("foo");
    fsLock.readLock();
    timer.advanceNanos(2400000);
    fsLock.readUnlock("foo");

    fsLock.readLock();
    timer.advance(1);
    fsLock.readLock();
    timer.advance(1);
    fsLock.readUnlock("bar");
    fsLock.readUnlock("bar");

    fsLock.writeLock();
    timer.advance(1);
    fsLock.writeUnlock("baz", false);

    MetricsRecordBuilder rb = MetricsAsserts.mockMetricsRecordBuilder();
    rates.snapshot(rb, true);

    assertGauge("FSNReadLockFooNanosAvgTime", 1850000.0, rb);
    assertCounter("FSNReadLockFooNanosNumOps", 2L, rb);
    assertGauge("FSNReadLockBarNanosAvgTime", 2000000.0, rb);
    assertCounter("FSNReadLockBarNanosNumOps", 1L, rb);
    assertGauge("FSNWriteLockBazNanosAvgTime", 1000000.0, rb);
    assertCounter("FSNWriteLockBazNanosNumOps", 1L, rb);

    // Overall
    assertGauge("FSNReadLockOverallNanosAvgTime", 1900000.0, rb);
    assertCounter("FSNReadLockOverallNanosNumOps", 3L, rb);
    assertGauge("FSNWriteLockOverallNanosAvgTime", 1000000.0, rb);
    assertCounter("FSNWriteLockOverallNanosNumOps", 1L, rb);
  }

  /**
   * Test to suppress FSNameSystem write lock report when it is held for long
   * time.
   */
  @Test(timeout = 45000)
  public void testFSWriteLockReportSuppressed() throws Exception {
    final long writeLockReportingThreshold = 1L;
    final long writeLockSuppressWarningInterval = 10L;
    Configuration conf = new Configuration();
    conf.setLong(
        DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
        writeLockReportingThreshold);
    conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
        writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS);

    final FakeTimer timer = new FakeTimer();
    final FSNamesystemLock fsnLock = new FSNamesystemLock(conf, "FSN", null, timer);
    timer.advance(writeLockSuppressWarningInterval);

    LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
    GenericTestUtils
        .setLogLevel(LoggerFactory.getLogger(FSNamesystem.class.getName()),
            org.slf4j.event.Level.INFO);

    // Should trigger the write lock report
    fsnLock.writeLock();
    timer.advance(writeLockReportingThreshold + 100);
    fsnLock.writeUnlock();
    assertTrue(logs.getOutput().contains(
        "Number of suppressed write-lock reports"));

    logs.clearOutput();

    // Suppress report if the write lock is held for a long time
    fsnLock.writeLock();
    timer.advance(writeLockReportingThreshold + 100);
    fsnLock.writeUnlock("testFSWriteLockReportSuppressed", true);
    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
    assertFalse(logs.getOutput().contains(
        "Number of suppressed write-lock reports:"));
  }

}