TestSeveralNameNodes.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.junit.Test;

/**
 * Test that we can start several and run with namenodes on the same minicluster
 */
public class TestSeveralNameNodes {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestSeveralNameNodes.class);

  /** ms between failovers between NNs */
  private static final int TIME_BETWEEN_FAILOVERS = 1000;
  private static final int NUM_NAMENODES = 3;
  private static final int NUM_THREADS = 3;
  private static final int LIST_LENGTH = 50;
  /** ms for length of test */
  private static final long RUNTIME = 100000;

  @Test
  public void testCircularLinkedListWrites() throws Exception {
    HAStressTestHarness harness = new HAStressTestHarness();
    // setup the harness
    harness.setNumberOfNameNodes(NUM_NAMENODES);
    harness.addFailoverThread(TIME_BETWEEN_FAILOVERS);
    harness.conf.setInt(HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, 1000);
    harness.conf.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 128);

    final MiniDFSCluster cluster = harness.startCluster();
    try {
      cluster.waitActive();
      cluster.transitionToActive(0);

      // setup the a circular writer
      FileSystem fs = harness.getFailoverFs();
      TestContext context = harness.testCtx;
      List<CircularWriter> writers = new ArrayList<CircularWriter>();
      for (int i = 0; i < NUM_THREADS; i++) {
        Path p = new Path("/test-" + i);
        fs.mkdirs(p);
        CircularWriter writer = new CircularWriter(context, LIST_LENGTH, fs, p);
        writers.add(writer);
        context.addThread(writer);
      }
      harness.startThreads();

      // wait for all the writer threads to finish, or that we exceed the time
      long start = System.currentTimeMillis();
      while ((System.currentTimeMillis() - start) < RUNTIME &&
          writers.size() > 0) {
        for (int i = 0; i < writers.size(); i++) {
          CircularWriter writer = writers.get(i);
          // remove the writer from the ones to check
          if (writer.done.await(100, TimeUnit.MILLISECONDS)) {
            writers.remove(i--);
          }
        }
      }
      assertEquals(
          "Some writers didn't complete in expected runtime! Current writer state:"
              + writers, 0,
          writers.size());

      harness.stopThreads();
    } finally {
      System.err.println("===========================\n\n\n\n");
      harness.shutdown();
    }
  }

  private static class CircularWriter extends RepeatingTestThread {

    private final int maxLength;
    private final Path dir;
    private final FileSystem fs;
    private int currentListIndex = 0;
    private CountDownLatch done = new CountDownLatch(1);

    public CircularWriter(TestContext context, int listLength, FileSystem fs,
        Path parentDir) {
      super(context);
      this.fs = fs;
      this.maxLength = listLength;
      this.dir = parentDir;
    }

    @Override
    public String toString() {
      StringBuilder builder = new StringBuilder("Circular Writer:\n");
      builder.append("\t directory: " + dir + "\n");
      builder.append("\t target length: " + maxLength + "\n");
      // might be a little racy, but we just want a close count
      builder.append("\t current item: " + currentListIndex + "\n");
      builder.append("\t done: " + (done.getCount() == 0) + "\n");
      return builder.toString();
    }

    @Override
    public void doAnAction() throws Exception {
      if (currentListIndex == maxLength) {
        checkList();
        this.stopTestThread();
        done.countDown();
      } else {
        writeList();
      }
    }

    /**
     * Make sure we can traverse the entire linked list
     */
    private void checkList() throws IOException {
      for (int i = 0; i < maxLength; i++) {
        Path nextFile = getNextFile(i);
        if (!fs.exists(nextFile)) {
          throw new RuntimeException("Next file " + nextFile
              + " for list does not exist!");
        }
        // read the next file name
        FSDataInputStream in = fs.open(nextFile);
        nextFile = getNextFile(in.read());
        in.close();
      }

    }

    private void cleanup() throws IOException {
      if (!fs.delete(dir, true)) {
        throw new RuntimeException("Didn't correctly delete " + dir);
      }
      if (!fs.mkdirs(dir)) {
        throw new RuntimeException("Didn't correctly make directory " + dir);
      }
    }

    private void writeList() throws IOException {
      Path nextPath = getNextFile(currentListIndex++);
      LOG.info("Writing next file: " + nextPath);
      FSDataOutputStream file = fs.create(nextPath);
      file.write(currentListIndex);
      file.close();
    }

    private Path getNextFile(int i) {
      return new Path(dir, Integer.toString(i));
    }
  }
}