TestRouterFaultTolerant.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import static java.util.Arrays.asList;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.refreshRoutersCaches;
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Test the handling of fault tolerant mount points in the Router.
 */
public class TestRouterFaultTolerant {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestRouterFaultTolerant.class);

  /** Number of files to create for testing. */
  private static final int NUM_FILES = 10;
  /** Number of Routers for test. */
  private static final int NUM_ROUTERS = 2;


  /** Namenodes for the test per name service id (subcluster). */
  private Map<String, MockNamenode> namenodes = new HashMap<>();
  /** Routers for the test. */
  private List<Router> routers = new ArrayList<>();

  /** Run test tasks in parallel. */
  private ExecutorService service;


  @BeforeEach
  public void setup() throws Exception {
    LOG.info("Start the Namenodes");
    Configuration nnConf = new HdfsConfiguration();
    nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10);
    for (final String nsId : asList("ns0", "ns1")) {
      MockNamenode nn = new MockNamenode(nsId, nnConf);
      nn.transitionToActive();
      nn.addFileSystemMock();
      namenodes.put(nsId, nn);
    }

    LOG.info("Start the Routers");
    Configuration routerConf = new RouterConfigBuilder()
        .stateStore()
        .admin()
        .rpc()
        .build();
    routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
    routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
    routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0");
    // Speedup time outs
    routerConf.setTimeDuration(
        RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
        500, TimeUnit.MILLISECONDS);

    Configuration stateStoreConf = getStateStoreConfiguration();
    stateStoreConf.setClass(
        RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
        MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
    stateStoreConf.setClass(
        RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
        MultipleDestinationMountTableResolver.class,
        FileSubclusterResolver.class);
    routerConf.addResource(stateStoreConf);

    for (int i = 0; i < NUM_ROUTERS; i++) {
      // router0 doesn't allow partial listing
      routerConf.setBoolean(
          RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, i != 0);

      final Router router = new Router();
      router.init(routerConf);
      router.start();
      routers.add(router);
    }

    LOG.info("Registering the subclusters in the Routers");
    registerSubclusters(
        routers, namenodes.values(), Collections.singleton("ns1"));

    service = Executors.newFixedThreadPool(10);
  }

  @AfterEach
  public void cleanup() throws Exception {
    LOG.info("Stopping the cluster");
    for (final MockNamenode nn : namenodes.values()) {
      nn.stop();
    }
    namenodes.clear();

    routers.forEach(Router::stop);
    routers.clear();

    if (service != null) {
      service.shutdown();
      service = null;
    }
  }

  /**
   * Update a mount table entry to be fault tolerant.
   * @param mountPoint Mount point to update.
   * @throws IOException If it cannot update the mount point.
   */
  private void updateMountPointFaultTolerant(final String mountPoint)
      throws IOException {
    Router router = getRandomRouter();
    RouterClient admin = getAdminClient(router);
    MountTableManager mountTable = admin.getMountTableManager();
    GetMountTableEntriesRequest getRequest =
        GetMountTableEntriesRequest.newInstance(mountPoint);
    GetMountTableEntriesResponse entries =
        mountTable.getMountTableEntries(getRequest);
    MountTable updateEntry = entries.getEntries().get(0);
    updateEntry.setFaultTolerant(true);
    UpdateMountTableEntryRequest updateRequest =
        UpdateMountTableEntryRequest.newInstance(updateEntry);
    UpdateMountTableEntryResponse updateResponse =
        mountTable.updateMountTableEntry(updateRequest);
    assertTrue(updateResponse.getStatus());

    refreshRoutersCaches(routers);
  }

  /**
   * Test the behavior of the Router when one of the subclusters in a mount
   * point fails. In particular, it checks if it can write files or not.
   * Related to {@link TestRouterRpcMultiDestination#testSubclusterDown()}.
   */
  @Test
  public void testWriteWithFailedSubcluster() throws Exception {

    LOG.info("Stop ns1 to simulate an unavailable subcluster");
    namenodes.get("ns1").stop();

    // Run the actual tests with each approach
    final List<Callable<Boolean>> tasks = new ArrayList<>();
    final List<DestinationOrder> orders = asList(
        DestinationOrder.HASH_ALL,
        DestinationOrder.SPACE,
        DestinationOrder.RANDOM,
        DestinationOrder.HASH);
    for (DestinationOrder order : orders) {
      tasks.add(() -> {
        testWriteWithFailedSubcluster(order);
        return true;
      });
    }
    TaskResults results = collectResults("Full tests", tasks);
    assertEquals(orders.size(), results.getSuccess());
  }

  /**
   * Test the behavior of the Router when one of the subclusters in a mount
   * point fails. It assumes that ns1 is already down.
   * @param order Destination order of the mount point.
   * @throws Exception If we cannot run the test.
   */
  private void testWriteWithFailedSubcluster(final DestinationOrder order)
      throws Exception {

    final FileSystem router0Fs = getFileSystem(routers.get(0));
    final FileSystem router1Fs = getFileSystem(routers.get(1));
    final FileSystem ns0Fs = getFileSystem(namenodes.get("ns0").getRPCPort());

    final String mountPoint = "/" + order + "-failsubcluster";
    final Path mountPath = new Path(mountPoint);
    LOG.info("Setup {} with order {}", mountPoint, order);
    createMountTableEntry(
        getRandomRouter(), mountPoint, order, namenodes.keySet());
    refreshRoutersCaches(routers);

    LOG.info("Write in {} should succeed writing in ns0 and fail for ns1",
        mountPath);
    checkDirectoriesFaultTolerant(
        mountPath, order, router0Fs, router1Fs, ns0Fs, false);
    checkFilesFaultTolerant(
        mountPath, order, router0Fs, router1Fs, ns0Fs, false);

    LOG.info("Make {} fault tolerant and everything succeeds", mountPath);
    IOException ioe = null;
    try {
      updateMountPointFaultTolerant(mountPoint);
    } catch (IOException e) {
      ioe = e;
    }
    if (DestinationOrder.FOLDER_ALL.contains(order)) {
      assertNull(ioe);
      checkDirectoriesFaultTolerant(
          mountPath, order, router0Fs, router1Fs, ns0Fs, true);
      checkFilesFaultTolerant(
          mountPath, order, router0Fs, router1Fs, ns0Fs, true);
    } else {
      assertTrue(ioe.getMessage().startsWith(
          "Invalid entry, fault tolerance only supported for ALL order"));
    }
  }

  /**
   * Check directory creation on a mount point.
   * If it is fault tolerant, it should be able to write everything.
   * If it is not fault tolerant, it should fail to write some.
   */
  private void checkDirectoriesFaultTolerant(
      Path mountPoint, DestinationOrder order,
      FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
      boolean faultTolerant) throws Exception {

    final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);

    LOG.info("Create directories in {}", mountPoint);
    final List<Callable<Boolean>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_FILES; i++) {
      final Path dir = new Path(mountPoint,
          String.format("dir-%s-%03d", faultTolerant, i));
      FileSystem fs = getRandomRouterFileSystem();
      tasks.add(getDirCreateTask(fs, dir));
    }
    TaskResults results = collectResults("Create dir " + mountPoint, tasks);

    LOG.info("Check directories results for {}: {}", mountPoint, results);
    if (faultTolerant || DestinationOrder.FOLDER_ALL.contains(order)) {
      assertEquals(NUM_FILES, results.getSuccess());
      assertEquals(0, results.getFailure());
    } else {
      assertBothResults("check dir " + mountPoint, NUM_FILES, results);
    }

    LOG.info("Check directories listing for {}", mountPoint);
    tasks.add(getListFailTask(router0Fs, mountPoint));
    int filesExpected = dirs0.length + results.getSuccess();
    tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected));
    results = collectResults("List " + mountPoint, tasks);
    assertEquals(2, results.getSuccess(), "Failed listing");

    tasks.add(getContentSummaryFailTask(router0Fs, mountPoint));
    tasks.add(getContentSummarySuccessTask(
        router1Fs, mountPoint, filesExpected));
    results = collectResults("Content summary "  + mountPoint, tasks);
    assertEquals(2, results.getSuccess(), "Failed content summary");
  }

  /**
   * Check file creation on a mount point.
   * If it is fault tolerant, it should be able to write everything.
   * If it is not fault tolerant, it should fail to write some of the files.
   */
  private void checkFilesFaultTolerant(
      Path mountPoint, DestinationOrder order,
      FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
      boolean faultTolerant) throws Exception {

    // Get one of the existing sub directories
    final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);
    final Path dir0 = Path.getPathWithoutSchemeAndAuthority(
        dirs0[0].getPath());

    LOG.info("Create files in {}",  dir0);
    final List<Callable<Boolean>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_FILES; i++) {
      final String newFile = String.format("%s/file-%03d.txt", dir0, i);
      FileSystem fs = getRandomRouterFileSystem();
      tasks.add(getFileCreateTask(fs, newFile, ns0Fs));
    }
    TaskResults results = collectResults("Create file " + dir0, tasks);

    LOG.info("Check files results for {}: {}", dir0, results);
    if (faultTolerant) {
      assertEquals(NUM_FILES, results.getSuccess(), "Not enough success in " + mountPoint);
      assertEquals(0, results.getFailure(), "Nothing should fail in " + mountPoint);
    } else {
      assertEquals(0, results.getSuccess(), "Nothing should succeed in " + mountPoint);
      assertEquals(NUM_FILES, results.getFailure(), "Everything should fail in " + mountPoint);
    }

    LOG.info("Check files listing for {}", dir0);
    tasks.add(getListFailTask(router0Fs, dir0));
    tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess()));
    assertEquals(2, collectResults("List " + dir0, tasks).getSuccess());

    tasks.add(getContentSummaryFailTask(router0Fs, dir0));
    tasks.add(getContentSummarySuccessTask(
        router1Fs, dir0, results.getSuccess()));
    results = collectResults("Content summary "  + dir0, tasks);
    assertEquals(2, results.getSuccess());
  }

  /**
   * Get the string representation for the files.
   * @param files Files to check.
   * @return String representation.
   */
  private static String toString(final FileStatus[] files) {
    final StringBuilder sb = new StringBuilder();
    sb.append("[");
    for (final FileStatus file : files) {
      if (sb.length() > 1) {
        sb.append(", ");
      }
      sb.append(Path.getPathWithoutSchemeAndAuthority(file.getPath()));
    }
    sb.append("]");
    return sb.toString();
  }

  /**
   * List the files in a path.
   * @param fs File system to check.
   * @param path Path to list.
   * @return List of files.
   * @throws IOException If we cannot list.
   */
  private FileStatus[] listStatus(final FileSystem fs, final Path path)
      throws IOException {
    FileStatus[] files = new FileStatus[] {};
    try {
      files = fs.listStatus(path);
    } catch (FileNotFoundException fnfe) {
      LOG.debug("File not found: {}", fnfe.getMessage());
    }
    return files;
  }

  /**
   * Task that creates a file and checks if it is available.
   * @param file File to create.
   * @param checkFs File system for checking if the file is properly created.
   * @return Result of creating the file.
   */
  private static Callable<Boolean> getFileCreateTask(
      final FileSystem fs, final String file, FileSystem checkFs) {
    return () -> {
      try {
        Path path = new Path(file);
        FSDataOutputStream os = fs.create(path);
        // We don't write because we have no mock Datanodes
        os.close();
        FileStatus fileStatus = checkFs.getFileStatus(path);
        assertTrue(fileStatus.getLen() > 0, "File not created properly: " + fileStatus);
        return true;
      } catch (RemoteException re) {
        return false;
      }
    };
  }

  /**
   * Task that creates a directory.
   * @param dir Directory to create.
   * @return Result of creating the directory..
   */
  private static Callable<Boolean> getDirCreateTask(
      final FileSystem fs, final Path dir) {
    return () -> {
      try {
        fs.mkdirs(dir);
        return true;
      } catch (RemoteException re) {
        return false;
      }
    };
  }

  /**
   * Task that lists a directory and expects to fail.
   * @param fs File system to check.
   * @param path Path to try to list.
   * @return If the listing failed as expected.
   */
  private static Callable<Boolean> getListFailTask(FileSystem fs, Path path) {
    return () -> {
      try {
        fs.listStatus(path);
        return false;
      } catch (RemoteException re) {
        return true;
      }
    };
  }

  /**
   * Task that lists a directory and succeeds.
   * @param fs File system to check.
   * @param path Path to list.
   * @param expected Number of files to expect to find.
   * @return If the listing succeeds.
   */
  private static Callable<Boolean> getListSuccessTask(
      FileSystem fs, Path path, int expected) {
    return () -> {
      final FileStatus[] dirs = fs.listStatus(path);
      assertEquals(expected, dirs.length, toString(dirs));
      return true;
    };
  }


  /**
   * Task that lists a directory and expects to fail.
   * @param fs File system to check.
   * @param path Path to try to list.
   * @return If the listing failed as expected.
   */
  private static Callable<Boolean> getContentSummaryFailTask(
      FileSystem fs, Path path) {
    return () -> {
      try {
        fs.getContentSummary(path);
        return false;
      } catch (RemoteException re) {
        return true;
      }
    };
  }

  /**
   * Task that lists a directory and succeeds.
   * @param fs File system to check.
   * @param path Path to list.
   * @param expected Number of files to expect to find.
   * @return If the listing succeeds.
   */
  private static Callable<Boolean> getContentSummarySuccessTask(
      FileSystem fs, Path path, int expected) {
    return () -> {
      ContentSummary summary = fs.getContentSummary(path);
      assertEquals(expected, summary.getFileAndDirectoryCount(), "Wrong summary for " + path);
      return true;
    };
  }

  /**
   * Invoke a set of tasks and collect their outputs.
   * The tasks should do assertions.
   *
   * @param service Execution Service to run the tasks.
   * @param tasks Tasks to run.
   * @throws Exception If it cannot collect the results.
   */
  private TaskResults collectResults(final String tag,
      final Collection<Callable<Boolean>> tasks) throws Exception {
    final TaskResults results = new TaskResults();
    service.invokeAll(tasks).forEach(task -> {
      try {
        boolean succeeded = task.get();
        if (succeeded) {
          LOG.info("Got success for {}", tag);
          results.incrSuccess();
        } else {
          LOG.info("Got failure for {}", tag);
          results.incrFailure();
        }
      } catch (Exception e) {
        StringWriter stackTrace = new StringWriter();
        PrintWriter writer = new PrintWriter(stackTrace);
        if (e instanceof ExecutionException) {
          e.getCause().printStackTrace(writer);
        } else {
          e.printStackTrace(writer);
        }
        fail("Failed to run \"" + tag + "\": " + stackTrace);
      }
    });
    tasks.clear();
    return results;
  }

  /**
   * Class to summarize the results of running a task.
   */
  static class TaskResults {
    private final AtomicInteger success = new AtomicInteger(0);
    private final AtomicInteger failure = new AtomicInteger(0);
    public void incrSuccess() {
      success.incrementAndGet();
    }
    public void incrFailure() {
      failure.incrementAndGet();
    }
    public int getSuccess() {
      return success.get();
    }
    public int getFailure() {
      return failure.get();
    }
    public int getTotal() {
      return success.get() + failure.get();
    }
    @Override
    public String toString() {
      return new StringBuilder()
          .append("Success=").append(getSuccess())
          .append(" Failure=").append(getFailure())
          .toString();
    }
  }

  /**
   * Asserts that the results are the expected amount, and it has both success
   * and failure.
   * @param msg Message to show when the assertion fails.
   * @param expected Expected number of results.
   * @param actual Actual results.
   */
  private static void assertBothResults(String msg,
      int expected, TaskResults actual) {
    assertEquals(expected, actual.getTotal(), msg);
    assertTrue(actual.getSuccess() > 0, "Expected some success for " + msg);
    assertTrue(actual.getFailure() > 0, "Expected some failure for " + msg);
  }

  /**
   * Get a random Router from the cluster.
   * @return Random Router.
   */
  private Router getRandomRouter() {
    Random rnd = new Random();
    int index = rnd.nextInt(routers.size());
    return routers.get(index);
  }

  /**
   * Get a file system from one of the Routers as a random user to allow better
   * concurrency in the Router.
   * @return File system from a random user.
   * @throws Exception If we cannot create the file system.
   */
  private FileSystem getRandomRouterFileSystem() throws Exception {
    final UserGroupInformation userUgi =
        UserGroupInformation.createUserForTesting(
            "user-" + UUID.randomUUID(), new String[]{"group"});
    Router router = getRandomRouter();
    return userUgi.doAs(
        (PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
  }

  @Test
  public void testReadWithFailedSubcluster() throws Exception {

    DestinationOrder order = DestinationOrder.HASH_ALL;
    final String mountPoint = "/" + order + "-testread";
    final Path mountPath = new Path(mountPoint);
    LOG.info("Setup {} with order {}", mountPoint, order);
    createMountTableEntry(
        routers, mountPoint, order, namenodes.keySet());

    FileSystem fs = getRandomRouterFileSystem();

    // Create a file (we don't write because we have no mock Datanodes)
    final Path fileexisting = new Path(mountPath, "fileexisting");
    final Path filenotexisting = new Path(mountPath, "filenotexisting");
    FSDataOutputStream os = fs.create(fileexisting);
    assertNotNull(os);
    os.close();

    // We should be able to read existing files
    FSDataInputStream fsdis = fs.open(fileexisting);
    assertNotNull(fsdis, "We should be able to read the file");
    // We shouldn't be able to read non-existing files
    LambdaTestUtils.intercept(FileNotFoundException.class,
        () -> fs.open(filenotexisting));

    // Check the subcluster where the file got created
    String nsIdWithFile = null;
    for (Entry<String, MockNamenode> entry : namenodes.entrySet()) {
      String nsId = entry.getKey();
      MockNamenode nn = entry.getValue();
      int rpc = nn.getRPCPort();
      FileSystem nnfs = getFileSystem(rpc);

      try {
        FileStatus fileStatus = nnfs.getFileStatus(fileexisting);
        assertNotNull(fileStatus);
        assertNull(nsIdWithFile, "The file cannot be in two subclusters");
        nsIdWithFile = nsId;
      } catch (FileNotFoundException fnfe) {
        LOG.debug("File not found in {}", nsId);
      }
    }
    assertNotNull(nsIdWithFile, "The file has to be in one subcluster");

    LOG.info("Stop {} to simulate an unavailable subcluster", nsIdWithFile);
    namenodes.get(nsIdWithFile).stop();

    // We should not get FileNotFoundException anymore
    try {
      fs.open(fileexisting);
      fail("It should throw an unavailable cluster exception");
    } catch(RemoteException re) {
      IOException ioe = re.unwrapRemoteException();
      assertTrue(RouterRpcClient.isUnavailableException(ioe),
          "Expected an unavailable exception for:" + ioe.getClass());
    }
  }
}