TestJournalNode.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.thirdparty.com.google.common.primitives.Bytes;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StopWatch;
import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


public class TestJournalNode {
  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
      12345, "mycluster", "my-bp", 0L);
  @Rule
  public TestName testName = new TestName();

  private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestJournalNode.class);

  private JournalNode jn;
  private Journal journal; 
  private final Configuration conf = new Configuration();
  private IPCLoggerChannel ch;
  private String journalId;

  static {
    // Avoid an error when we double-initialize JvmMetrics
    DefaultMetricsSystem.setMiniClusterMode(true);
  }
  
  @Before
  public void setup() throws Exception {
    File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
        File.separator + "TestJournalNode");
    FileUtil.fullyDelete(editsDir);
    journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();

    if (testName.getMethodName().equals("testJournalDirPerNameSpace")) {
      setFederationConf();
      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1",
          editsDir + File.separator + "ns1");
      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns2",
          editsDir + File.separator + "ns2");
    } else if (testName.getMethodName().equals(
        "testJournalCommonDirAcrossNameSpace")){
      setFederationConf();
      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
          editsDir.getAbsolutePath());
    } else if (testName.getMethodName().equals(
        "testJournalDefaultDirForOneNameSpace") ||
        testName.getMethodName().equals("testJournalMetricTags")) {
      FileUtil.fullyDelete(new File(DFSConfigKeys
          .DFS_JOURNALNODE_EDITS_DIR_DEFAULT));
      setFederationConf();
      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1",
          editsDir + File.separator + "ns1");
    } else {
      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
          editsDir.getAbsolutePath());
    }
    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
        "0.0.0.0:0");
    if (testName.getMethodName().equals(
        "testJournalNodeSyncerNotStartWhenSyncDisabled")) {
      conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
          false);
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
          "qjournal://jn0:9900;jn1:9901/" + journalId);
    } else if (testName.getMethodName().equals(
        "testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI")) {
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
          "qjournal://journal0\\:9900;journal1:9901/" + journalId);
    } else if (testName.getMethodName().equals(
        "testJournalNodeSyncerNotStartWhenSyncEnabled")) {
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
          "qjournal://jn0:9900;jn1:9901/" + journalId);
    } else if (testName.getMethodName().equals(
        "testJournalNodeSyncwithFederationTypeConfigWithNameServiceId")) {
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1",
          "qjournal://journalnode0:9900;journalnode0:9901/" + journalId);
    } else if (testName.getMethodName().equals(
        "testJournalNodeSyncwithFederationTypeConfigWithNamenodeId")) {
      conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
          "qjournal://journalnode0:9900;journalnode1:9901/" +journalId);
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
          "qjournal://journalnode0:9900;journalnode1:9901/" +journalId);
    } else if (testName.getMethodName().equals(
        "testJournalNodeSyncwithFederationTypeIncorrectConfigWithNamenodeId")) {
      conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
          "qjournal://journalnode0:9900;journalnode1:9901/" + journalId);
      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
          "qjournal://journalnode0:9902;journalnode1:9903/" + journalId);
    } else if (testName.getMethodName().equals("testConfAbnormalHandlerNumber")) {
      conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY, -1);
    } else if (testName.getMethodName().equals("testConfNormalHandlerNumber")) {
      conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY, 10);
    }
    jn = new JournalNode();
    jn.setConf(conf);
    jn.start();


    if (testName.getMethodName().equals("testJournalDirPerNameSpace") ||
        testName.getMethodName().equals(
            "testJournalCommonDirAcrossNameSpace") ||
        testName.getMethodName().equals(
            "testJournalDefaultDirForOneNameSpace") ||
        testName.getMethodName().equals("testJournalMetricTags")) {
      Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
      for(String nsId: nameServiceIds) {
        journalId = "test-journalid-" + nsId;
        journal = jn.getOrCreateJournal(journalId, nsId,
            HdfsServerConstants.StartupOption.REGULAR);
        NamespaceInfo fakeNameSpaceInfo = new NamespaceInfo(
            12345, "mycluster", "my-bp"+nsId, 0L);
        journal.format(fakeNameSpaceInfo, false);
      }
    } else {
      journal = jn.getOrCreateJournal(journalId);
      journal.format(FAKE_NSINFO, false);
    }

    
    ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
  }

  private void setFederationConf() {
    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1, ns2");

    //ns1
    conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
        "qjournal://journalnode0:9900;journalnode1:9901/test-journalid-ns1");
    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
        "qjournal://journalnode0:9900;journalnode1:9901/test-journalid-ns1");

    //ns2
    conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns2", "nn3,nn4");
    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn3",
        "qjournal://journalnode0:9900;journalnode1:9901/test-journalid-ns2");
    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn4",
        "qjournal://journalnode0:9900;journalnode1:9901/test-journalid-ns2");
  }
  
  @After
  public void teardown() throws Exception {
    jn.stop(0);
  }

  @Test(timeout=100000)
  public void testJournalDirPerNameSpace() {
    Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
    setupStaticHostResolution(2, "journalnode");
    for (String nsId : nameServiceIds) {
      String jid = "test-journalid-" + nsId;
      Journal nsJournal = jn.getJournal(jid);
      JNStorage journalStorage = nsJournal.getStorage();
      File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
          File.separator + "TestJournalNode" + File.separator
          + nsId + File.separator + jid);
      assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
    }
  }

  @Test(timeout=100000)
  public void testJournalCommonDirAcrossNameSpace() {
    Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
    setupStaticHostResolution(2, "journalnode");
    for (String nsId : nameServiceIds) {
      String jid = "test-journalid-" + nsId;
      Journal nsJournal = jn.getJournal(jid);
      JNStorage journalStorage = nsJournal.getStorage();
      File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
          File.separator + "TestJournalNode" + File.separator + jid);
      assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
    }
  }

  @Test(timeout=100000)
  public void testJournalDefaultDirForOneNameSpace() {
    Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
    setupStaticHostResolution(2, "journalnode");
    String jid = "test-journalid-ns1";
    Journal nsJournal = jn.getJournal(jid);
    JNStorage journalStorage = nsJournal.getStorage();
    File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
        File.separator + "TestJournalNode" + File.separator + "ns1" + File
        .separator + jid);
    assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
    jid = "test-journalid-ns2";
    nsJournal = jn.getJournal(jid);
    journalStorage = nsJournal.getStorage();
    editsDir = new File(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT +
        File.separator + jid);
    assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
  }

  @Test(timeout=100000)
  public void testJournalMetricTags() {
    setupStaticHostResolution(2, "journalnode");
    String jid = "test-journalid-ns1";
    Journal nsJournal = jn.getJournal(jid);
    MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
        nsJournal.getMetrics().getName());
    MetricsAsserts.assertTag("JournalId", jid, metrics);

    jid = "test-journalid-ns2";
    nsJournal = jn.getJournal(jid);
    metrics = MetricsAsserts.getMetrics(
        nsJournal.getMetrics().getName());
    MetricsAsserts.assertTag("JournalId", jid, metrics);
  }

  @Test(timeout=100000)
  public void testJournal() throws Exception {
    MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
        journal.getMetrics().getName());
    MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
    MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
    MetricsAsserts.assertGauge("LastJournalTimestamp", 0L, metrics);

    long beginTimestamp = System.currentTimeMillis();
    IPCLoggerChannel ch = new IPCLoggerChannel(
        conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
    ch.newEpoch(1).get();
    ch.setEpoch(1);
    ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
    ch.sendEdits(1L, 1, 1, "hello".getBytes(StandardCharsets.UTF_8)).get();
    
    metrics = MetricsAsserts.getMetrics(
        journal.getMetrics().getName());
    MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
    MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
    long lastJournalTimestamp = MetricsAsserts.getLongGauge(
        "LastJournalTimestamp", metrics);
    assertTrue(lastJournalTimestamp > beginTimestamp);
    beginTimestamp = lastJournalTimestamp;

    ch.setCommittedTxId(100L);
    ch.sendEdits(1L, 2, 1, "goodbye".getBytes(StandardCharsets.UTF_8)).get();

    metrics = MetricsAsserts.getMetrics(
        journal.getMetrics().getName());
    MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
    MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
    lastJournalTimestamp = MetricsAsserts.getLongGauge(
        "LastJournalTimestamp", metrics);
    assertTrue(lastJournalTimestamp > beginTimestamp);

  }
  
  
  @Test(timeout=100000)
  public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
    ch.newEpoch(1).get();
    ch.setEpoch(1);
    ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
    ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get();
    
    // Switch to a new epoch without closing earlier segment
    NewEpochResponseProto response = ch.newEpoch(2).get();
    ch.setEpoch(2);
    assertEquals(1, response.getLastSegmentTxId());
    
    ch.finalizeLogSegment(1, 2).get();
    
    // Switch to a new epoch after just closing the earlier segment.
    response = ch.newEpoch(3).get();
    ch.setEpoch(3);
    assertEquals(1, response.getLastSegmentTxId());
    
    // Start a segment but don't write anything, check newEpoch segment info
    ch.startLogSegment(3, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
    response = ch.newEpoch(4).get();
    ch.setEpoch(4);
    // Because the new segment is empty, it is equivalent to not having
    // started writing it. Hence, we should return the prior segment txid.
    assertEquals(1, response.getLastSegmentTxId());
  }
  
  @Test(timeout=100000)
  public void testHttpServer() throws Exception {
    String urlRoot = jn.getHttpServerURI();
    
    // Check default servlets.
    String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx"));
    assertTrue("Bad contents: " + pageContents,
        pageContents.contains(
            "Hadoop:service=JournalNode,name=JvmMetrics"));

    // Create some edits on server side
    byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3);
    IPCLoggerChannel ch = new IPCLoggerChannel(
        conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
    ch.newEpoch(1).get();
    ch.setEpoch(1);
    ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
    ch.sendEdits(1L, 1, 3, EDITS_DATA).get();
    ch.finalizeLogSegment(1, 3).get();

    // Attempt to retrieve via HTTP, ensure we get the data back
    // including the header we expected
    byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot +
        "/getJournal?segmentTxId=1&jid=" + journalId));
    byte[] expected = Bytes.concat(
            Ints.toByteArray(HdfsServerConstants.NAMENODE_LAYOUT_VERSION),
            (new byte[] { 0, 0, 0, 0 }), // layout flags section
            EDITS_DATA);

    assertArrayEquals(expected, retrievedViaHttp);
    
    // Attempt to fetch a non-existent file, check that we get an
    // error status code
    URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + journalId);
    HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection();
    try {
      assertEquals(404, connection.getResponseCode());
    } finally {
      connection.disconnect();
    }
  }

  /**
   * Test that the JournalNode performs correctly as a Paxos
   * <em>Acceptor</em> process.
   */
  @Test(timeout=100000)
  public void testAcceptRecoveryBehavior() throws Exception {
    // We need to run newEpoch() first, or else we have no way to distinguish
    // different proposals for the same decision.
    try {
      ch.prepareRecovery(1L).get();
      fail("Did not throw IllegalState when trying to run paxos without an epoch");
    } catch (ExecutionException ise) {
      GenericTestUtils.assertExceptionContains("bad epoch", ise);
    }
    
    ch.newEpoch(1).get();
    ch.setEpoch(1);
    
    // prepare() with no previously accepted value and no logs present
    PrepareRecoveryResponseProto prep = ch.prepareRecovery(1L).get();
    System.err.println("Prep: " + prep);
    assertFalse(prep.hasAcceptedInEpoch());
    assertFalse(prep.hasSegmentState());
    
    // Make a log segment, and prepare again -- this time should see the
    // segment existing.
    ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
    ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get();

    prep = ch.prepareRecovery(1L).get();
    System.err.println("Prep: " + prep);
    assertFalse(prep.hasAcceptedInEpoch());
    assertTrue(prep.hasSegmentState());
    
    // accept() should save the accepted value in persistent storage
    ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();

    // So another prepare() call from a new epoch would return this value
    ch.newEpoch(2);
    ch.setEpoch(2);
    prep = ch.prepareRecovery(1L).get();
    assertEquals(1L, prep.getAcceptedInEpoch());
    assertEquals(1L, prep.getSegmentState().getEndTxId());
    
    // A prepare() or accept() call from an earlier epoch should now be rejected
    ch.setEpoch(1);
    try {
      ch.prepareRecovery(1L).get();
      fail("prepare from earlier epoch not rejected");
    } catch (ExecutionException ioe) {
      GenericTestUtils.assertExceptionContains(
          "epoch 1 is less than the last promised epoch 2",
          ioe);
    }
    try {
      ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
      fail("accept from earlier epoch not rejected");
    } catch (ExecutionException ioe) {
      GenericTestUtils.assertExceptionContains(
          "epoch 1 is less than the last promised epoch 2",
          ioe);
    }
  }
  
  @Test(timeout=100000)
  public void testFailToStartWithBadConfig() throws Exception {
    Configuration conf = new Configuration();
    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "non-absolute-path");
    conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
    assertJNFailsToStart(conf, "should be an absolute path");
    
    // Existing file which is not a directory 
    File existingFile = new File(TEST_BUILD_DATA, "testjournalnodefile");
    assertTrue(existingFile.createNewFile());
    try {
      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
          existingFile.getAbsolutePath());
      assertJNFailsToStart(conf, "Not a directory");
    } finally {
      existingFile.delete();
    }
    
    // Directory which cannot be created
    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
        Shell.WINDOWS ? "\\\\cannotBeCreated" : "/proc/does-not-exist");
    assertJNFailsToStart(conf, "Cannot create directory");
  }

  private static void assertJNFailsToStart(Configuration conf,
      String errString) {
    try {
      JournalNode jn = new JournalNode();
      jn.setConf(conf);
      jn.start();
    } catch (Exception e) {
      GenericTestUtils.assertExceptionContains(errString, e);
    }
  }
  
  /**
   * Simple test of how fast the code path is to write edits.
   * This isn't a true unit test, but can be run manually to
   * check performance.
   * 
   * At the time of development, this test ran in ~4sec on an
   * SSD-enabled laptop (1.8ms/batch).
   */
  @Test(timeout=100000)
  public void testPerformance() throws Exception {
    doPerfTest(8192, 1024); // 8MB
  }
  
  private void doPerfTest(int editsSize, int numEdits) throws Exception {
    byte[] data = new byte[editsSize];
    ch.newEpoch(1).get();
    ch.setEpoch(1);
    ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
    
    StopWatch sw = new StopWatch().start();
    for (int i = 1; i < numEdits; i++) {
      ch.sendEdits(1L, i, 1, data).get();
    }
    long time = sw.now(TimeUnit.MILLISECONDS);
    
    System.err.println("Wrote " + numEdits + " batches of " + editsSize +
        " bytes in " + time + "ms");
    float avgRtt = (float)time/(float)numEdits;
    long throughput = ((long)numEdits * editsSize * 1000L)/time;
    System.err.println("Time per batch: " + avgRtt + "ms");
    System.err.println("Throughput: " + throughput + " bytes/sec");
  }

  /**
   * Test case to check if JournalNode exits cleanly when httpserver or rpc
   * server fails to start. Call to JournalNode start should fail with bind
   * exception as the port is in use by the JN started in @Before routine
   */
  @Test
  public void testJournalNodeStartupFailsCleanly() {
    JournalNode jNode = Mockito.spy(new JournalNode());
    try {
      jNode.setConf(conf);
      jNode.start();
      fail("Should throw bind exception");
    } catch (Exception e) {
      GenericTestUtils
          .assertExceptionContains("java.net.BindException: Port in use", e);
    }
    Mockito.verify(jNode).stop(1);
  }

  @Test
  public void testJournalNodeSyncerNotStartWhenSyncDisabled()
      throws IOException {
    //JournalSyncer will not be started, as journalsync is not enabled
    conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false);
    jn.getOrCreateJournal(journalId);
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(false,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

    //Trying by passing nameserviceId still journalnodesyncer should not start
    // IstriedJournalSyncerStartWithnsId should also be false
    jn.getOrCreateJournal(journalId, "mycluster");
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(false,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

  }

  @Test
  public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI()
      throws IOException {
    //JournalSyncer will not be started,
    // as shared edits hostnames are not resolved
    jn.getOrCreateJournal(journalId);
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(false,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

    //Trying by passing nameserviceId, now
    // IstriedJournalSyncerStartWithnsId should be set
    // but journalnode syncer will not be started,
    // as hostnames are not resolved
    jn.getOrCreateJournal(journalId, "mycluster");
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(true,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

  }

  @Test
  public void testJournalNodeSyncerNotStartWhenSyncEnabled()
      throws IOException {
    //JournalSyncer will not be started,
    // as shared edits hostnames are not resolved
    jn.getOrCreateJournal(journalId);
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(false,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

    //Trying by passing nameserviceId and resolve hostnames
    // now IstriedJournalSyncerStartWithnsId should be set
    // and also journalnode syncer will also be started
    setupStaticHostResolution(2, "jn");
    jn.getOrCreateJournal(journalId, "mycluster");
    Assert.assertEquals(true,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(true,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

  }


  @Test
  public void testJournalNodeSyncwithFederationTypeConfigWithNameServiceId()
      throws IOException {
    //JournalSyncer will not be started, as nameserviceId passed is null,
    // but configured shared edits dir is appended with nameserviceId
    setupStaticHostResolution(2, "journalnode");
    jn.getOrCreateJournal(journalId);
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(false,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

    //Trying by passing nameserviceId and resolve hostnames
    // now IstriedJournalSyncerStartWithnsId should be set
    // and also journalnode syncer will also be started

    jn.getOrCreateJournal(journalId, "ns1");
    Assert.assertEquals(true,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(true,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
  }

  @Test
  public void testJournalNodeSyncwithFederationTypeConfigWithNamenodeId()
      throws IOException {
    //JournalSyncer will not be started, as nameserviceId passed is null,
    // but configured shared edits dir is appended with nameserviceId +
    // namenodeId
    setupStaticHostResolution(2, "journalnode");
    jn.getOrCreateJournal(journalId);
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(false,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

    //Trying by passing nameserviceId and resolve hostnames
    // now IstriedJournalSyncerStartWithnsId should be set
    // and also journalnode syncer will also be started

    jn.getOrCreateJournal(journalId, "ns1");
    Assert.assertEquals(true,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(true,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
  }

  @Test
  public void
      testJournalNodeSyncwithFederationTypeIncorrectConfigWithNamenodeId()
      throws IOException {
    //JournalSyncer will not be started, as nameserviceId passed is null,
    // but configured shared edits dir is appended with nameserviceId +
    // namenodeId
    setupStaticHostResolution(2, "journalnode");
    jn.getOrCreateJournal(journalId);
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(false,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

    //Trying by passing nameserviceId and resolve hostnames
    // now IstriedJournalSyncerStartWithnsId should  be set
    // and  journalnode syncer will not  be started
    // as for each nnId, different shared Edits dir value is configured

    jn.getOrCreateJournal(journalId, "ns1");
    Assert.assertEquals(false,
        jn.getJournalSyncerStatus(journalId));
    Assert.assertEquals(true,
        jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
  }


  private void setupStaticHostResolution(int journalNodeCount,
                                         String hostname) {
    for (int i = 0; i < journalNodeCount; i++) {
      NetUtils.addStaticResolution(hostname + i,
          "localhost");
    }
  }

  @Test
  public void testConfNormalHandlerNumber() {
    int confHandlerNumber = jn.getConf().getInt(
        DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY,
        DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
    assertTrue(confHandlerNumber > 0);
    int handlerCount = jn.getRpcServer().getHandlerCount();
    assertEquals(confHandlerNumber, handlerCount);
    assertEquals(10, handlerCount);
  }

  @Test
  public void testConfAbnormalHandlerNumber() {
    int confHandlerCount = jn.getConf().getInt(
        DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY,
        DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
    assertTrue(confHandlerCount <= 0);
    int handlerCount = jn.getRpcServer().getHandlerCount();
    assertEquals(
        DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT,
        handlerCount);
  }
}