TestConsistentReadsObserver.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test consistency of reads while accessing an ObserverNode.
 * The tests are based on traditional (non fast path) edits tailing.
 */
public class TestConsistentReadsObserver {
  public static final Logger LOG =
      LoggerFactory.getLogger(TestConsistentReadsObserver.class.getName());

  private static Configuration conf;
  private static MiniQJMHACluster qjmhaCluster;
  private static MiniDFSCluster dfsCluster;
  private DistributedFileSystem dfs;

  private final Path testPath= new Path("/TestConsistentReadsObserver");

  @BeforeClass
  public static void startUpCluster() throws Exception {
    conf = new Configuration();
    conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
    // disable fast tailing here because this test's assertions are based on the
    // timing of explicitly called rollEditLogAndTail. Although this means this
    // test takes some time to run
    // TODO: revisit if there is a better way.
    qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, false);
    dfsCluster = qjmhaCluster.getDfsCluster();
  }

  @Before
  public void setUp() throws Exception {
    dfs = setObserverRead(true);
  }

  @After
  public void cleanUp() throws IOException {
    dfs.delete(testPath, true);
  }

  @AfterClass
  public static void shutDownCluster() throws IOException {
    if (qjmhaCluster != null) {
      qjmhaCluster.shutdown();
    }
  }

  @Test
  public void testRequeueCall() throws Exception {
    // Update the configuration just for the observer, by enabling
    // IPC backoff and using the test scheduler class, which starts to backoff
    // after certain number of calls.
    final int observerIdx = 2;
    NameNode nn = dfsCluster.getNameNode(observerIdx);
    int port = nn.getNameNodeAddress().getPort();
    Configuration originalConf = dfsCluster.getConfiguration(observerIdx);
    Configuration configuration = new Configuration(originalConf);
    String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
    configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
        TestRpcScheduler.class.getName());
    configuration.setBoolean(prefix
        + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);

    NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
    assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0);

    dfs.create(testPath, (short)1).close();
    assertSentTo(0);

    // Since we haven't tailed edit logs on the observer, it will fall behind
    // and keep re-queueing the incoming request. Eventually, RPC backoff will
    // be triggered and client should retry active NN.
    dfs.getFileStatus(testPath);
    assertSentTo(0);
    assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1);
    // reset the original call queue
    NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
  }

  @Test
  public void testMsyncSimple() throws Exception {
    // 0 == not completed, 1 == succeeded, -1 == failed
    AtomicInteger readStatus = new AtomicInteger(0);

    // Making an uncoordinated call, which initialize the proxy
    // to Observer node.
    dfs.getClient().getHAServiceState();
    dfs.mkdir(testPath, FsPermission.getDefault());
    assertSentTo(0);

    Thread reader = new Thread(() -> {
      try {
        // this read will block until roll and tail edits happen.
        dfs.getFileStatus(testPath);
        readStatus.set(1);
      } catch (IOException e) {
        e.printStackTrace();
        readStatus.set(-1);
      }
    });

    reader.start();
    // the reader is still blocking, not succeeded yet.
    assertEquals(0, readStatus.get());
    dfsCluster.rollEditLogAndTail(0);
    // wait a while for all the change to be done
    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
    // the reader should have succeed.
    assertEquals(1, readStatus.get());
  }

  private void testMsync(boolean autoMsync, long autoMsyncPeriodMs)
      throws Exception {
    // 0 == not completed, 1 == succeeded, -1 == failed
    AtomicInteger readStatus = new AtomicInteger(0);
    Configuration conf2 = new Configuration(conf);

    // Disable FS cache so two different DFS clients will be used.
    conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
    if (autoMsync) {
      conf2.setTimeDuration(
          ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX
              + "." + dfs.getUri().getHost(),
          autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
    }
    DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);

    // Initialize the proxies for Observer Node.
    dfs.getClient().getHAServiceState();
    // This initialization will perform the msync-on-startup, so that another
    // form of msync is required later
    dfs2.getClient().getHAServiceState();

    // Advance Observer's state ID so it is ahead of client's.
    dfs.mkdir(new Path("/test"), FsPermission.getDefault());
    dfsCluster.rollEditLogAndTail(0);

    dfs.mkdir(testPath, FsPermission.getDefault());
    assertSentTo(0);

    Thread reader = new Thread(() -> {
      try {
        // After msync, client should have the latest state ID from active.
        // Therefore, the subsequent getFileStatus call should succeed.
        if (!autoMsync) {
          // If not testing auto-msync, perform an explicit one here
          dfs2.msync();
        } else if (autoMsyncPeriodMs > 0) {
          Thread.sleep(autoMsyncPeriodMs);
        }
        dfs2.getFileStatus(testPath);
        if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
          readStatus.set(1);
        } else {
          readStatus.set(-1);
        }
      } catch (Exception e) {
        e.printStackTrace();
        readStatus.set(-1);
      }
    });

    reader.start();

    Thread.sleep(100);
    assertEquals(0, readStatus.get());

    dfsCluster.rollEditLogAndTail(0);

    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000);
    assertEquals(1, readStatus.get());
  }

  @Test
  public void testExplicitMsync() throws Exception {
    testMsync(false, -1);
  }

  @Test
  public void testAutoMsyncPeriod0() throws Exception {
    testMsync(true, 0);
  }

  @Test
  public void testAutoMsyncPeriod5() throws Exception {
    testMsync(true, 5);
  }

  @Test(expected = TimeoutException.class)
  public void testAutoMsyncLongPeriod() throws Exception {
    // This should fail since the auto-msync is never activated
    testMsync(true, Long.MAX_VALUE);
  }

  // A new client should first contact the active, before using an observer,
  // to ensure that it is up-to-date with the current state
  @Test
  public void testCallFromNewClient() throws Exception {
    // Set the order of nodes: Observer, Standby, Active
    // This is to ensure that test doesn't pass trivially because the active is
    // the first node contacted
    dfsCluster.transitionToStandby(0);
    dfsCluster.transitionToObserver(0);
    dfsCluster.transitionToStandby(2);
    dfsCluster.transitionToActive(2);
    try {
      // 0 == not completed, 1 == succeeded, -1 == failed
      AtomicInteger readStatus = new AtomicInteger(0);

      // Initialize the proxies for Observer Node.
      dfs.getClient().getHAServiceState();

      // Advance Observer's state ID so it is ahead of client's.
      dfs.mkdir(new Path("/test"), FsPermission.getDefault());
      dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
      dfsCluster.getNameNode(0)
          .getNamesystem().getEditLogTailer().doTailEdits();

      dfs.mkdir(testPath, FsPermission.getDefault());
      assertSentTo(2);

      Configuration conf2 = new Configuration(conf);

      // Disable FS cache so two different DFS clients will be used.
      conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
      DistributedFileSystem dfs2 =
          (DistributedFileSystem) FileSystem.get(conf2);
      dfs2.getClient().getHAServiceState();

      Thread reader = new Thread(() -> {
        try {
          dfs2.getFileStatus(testPath);
          readStatus.set(1);
        } catch (Exception e) {
          e.printStackTrace();
          readStatus.set(-1);
        }
      });

      reader.start();

      Thread.sleep(100);
      assertEquals(0, readStatus.get());

      dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
      dfsCluster.getNameNode(0)
          .getNamesystem().getEditLogTailer().doTailEdits();

      GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
      assertEquals(1, readStatus.get());
    } finally {
      // Put the cluster back the way it was when the test started
      dfsCluster.transitionToStandby(2);
      dfsCluster.transitionToObserver(2);
      dfsCluster.transitionToStandby(0);
      dfsCluster.transitionToActive(0);
    }
  }

  @Test
  public void testUncoordinatedCall() throws Exception {
    // make a write call so that client will be ahead of
    // observer for now.
    dfs.mkdir(testPath, FsPermission.getDefault());

    // a status flag, initialized to 0, after reader finished, this will be
    // updated to 1, -1 on error
    AtomicInteger readStatus = new AtomicInteger(0);

    // create a separate thread to make a blocking read.
    Thread reader = new Thread(() -> {
      try {
        // this read call will block until server state catches up. But due to
        // configuration, this will take a very long time.
        dfs.getClient().getFileInfo("/");
        readStatus.set(1);
        fail("Should have been interrupted before getting here.");
      } catch (IOException e) {
        e.printStackTrace();
        readStatus.set(-1);
      }
    });
    reader.start();

    long before = Time.now();
    dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
    long after = Time.now();

    // should succeed immediately, because datanodeReport is marked an
    // uncoordinated call, and will not be waiting for server to catch up.
    assertTrue(after - before < 200);
    // by this time, reader thread should still be blocking, so the status not
    // updated
    assertEquals(0, readStatus.get());
    Thread.sleep(5000);
    // reader thread status should still be unchanged after 5 sec...
    assertEquals(0, readStatus.get());
    // and the reader thread is not dead, so it must be still waiting
    assertEquals(Thread.State.WAITING, reader.getState());
    reader.interrupt();
  }

  @Test
  public void testRequestFromNonObserverProxyProvider() throws Exception {
    // Create another HDFS client using ConfiguredFailoverProvider
    Configuration conf2 = new Configuration(conf);

    // Populate the above configuration with only a single observer in the
    // namenode list. Also reduce retries to make test finish faster.
    HATestUtil.setFailoverConfigurations(
        conf2,
        HATestUtil.getLogicalHostname(dfsCluster),
        Collections.singletonList(
            dfsCluster.getNameNode(2).getNameNodeAddress()),
        ConfiguredFailoverProxyProvider.class);
    conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
    conf2.setInt(HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 1);
    conf2.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 1);
    FileSystem dfs2 = FileSystem.get(conf2);

    dfs.mkdir(testPath, FsPermission.getDefault());
    dfsCluster.rollEditLogAndTail(0);

    try {
      // Request should be rejected by observer and throw StandbyException
      dfs2.listStatus(testPath);
      fail("listStatus should have thrown exception");
    } catch (RemoteException re) {
      IOException e = re.unwrapRemoteException();
      assertTrue("should have thrown StandbyException but got "
              + e.getClass().getSimpleName(),
          e instanceof StandbyException);
    }
  }

  @Test(timeout=10000)
  public void testMsyncFileContext() throws Exception {
    NameNode nn0 = dfsCluster.getNameNode(0);
    NameNode nn2 = dfsCluster.getNameNode(2);
    HAServiceStatus st = nn0.getRpcServer().getServiceStatus();
    assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState());
    st = nn2.getRpcServer().getServiceStatus();
    assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState());

    FileContext fc = FileContext.getFileContext(conf);
    // initialize observer proxy for FileContext
    fc.getFsStatus(testPath);

    Path p = new Path(testPath, "testMsyncFileContext");
    fc.mkdir(p, FsPermission.getDefault(), true);
    fc.msync();
    dfsCluster.rollEditLogAndTail(0);
    LOG.info("State id active = {}, Stat id observer = {}",
        nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(),
        nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId());
    try {
      // if getFileStatus is taking too long due to server requeueing
      // the test will time out
      fc.getFileStatus(p);
    } catch (FileNotFoundException e) {
      fail("File should exist on Observer after msync");
    }
  }

  @Test
  public void testRpcQueueTimeNumOpsMetrics() throws Exception {
    // 0 == not completed, 1 == succeeded, -1 == failed
    AtomicInteger readStatus = new AtomicInteger(0);

    // Making an uncoordinated call, which initialize the proxy
    // to Observer node.
    dfs.getClient().getHAServiceState();
    dfs.mkdir(testPath, FsPermission.getDefault());
    assertSentTo(0);

    Thread reader = new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          // this read will block until roll and tail edits happen.
          dfs.getFileStatus(testPath);
          readStatus.set(1);
        } catch (IOException e) {
          e.printStackTrace();
          readStatus.set(-1);
        }
      }
    });

    reader.start();
    // the reader is still blocking, not succeeded yet.
    assertEquals(0, readStatus.get());
    dfsCluster.rollEditLogAndTail(0);
    // wait a while for all the change to be done
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        return readStatus.get() != 0;
      }
    }, 100, 10000);
    // the reader should have succeed.
    assertEquals(1, readStatus.get());

    final int observerIdx = 2;
    NameNode observerNN = dfsCluster.getNameNode(observerIdx);
    MetricsRecordBuilder rpcMetrics =
        getMetrics("RpcActivityForPort"
            + observerNN.getNameNodeAddress().getPort());
    long rpcQueueTimeNumOps = getLongCounter("RpcQueueTimeNumOps", rpcMetrics);
    long rpcProcessingTimeNumOps = getLongCounter("RpcProcessingTimeNumOps",
        rpcMetrics);
    assertEquals(rpcQueueTimeNumOps, rpcProcessingTimeNumOps);
  }

  private void assertSentTo(int nnIdx) throws IOException {
    assertTrue("Request was not sent to the expected namenode " + nnIdx,
        HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
  }

  private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
    return HATestUtil.configureObserverReadFs(
        dfsCluster, conf, ObserverReadProxyProvider.class, flag);
  }

  /**
   * A dummy test scheduler that starts backoff after a fixed number
   * of requests.
   */
  public static class TestRpcScheduler implements RpcScheduler {
    // Allow a number of RPCs to pass in order for the NN restart to succeed.
    private int allowed = 10;
    public TestRpcScheduler() {}

    @Override
    public int getPriorityLevel(Schedulable obj) {
      return 0;
    }

    @Override
    public boolean shouldBackOff(Schedulable obj) {
      return --allowed < 0;
    }

    @Override
    public void stop() {
    }
  }
}