TestRouterAllResolver.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
 * Tests the use of the resolvers that write in all subclusters from the
 * Router. It supports:
 * <li>HashResolver
 * <li>RandomResolver.
 */
public class TestRouterAllResolver {

  /** Directory that will be in a HASH_ALL mount point. */
  private static final String TEST_DIR_HASH_ALL = "/hashall";
  /** Directory that will be in a RANDOM mount point. */
  private static final String TEST_DIR_RANDOM = "/random";
  /** Directory that will be in a SPACE mount point. */
  private static final String TEST_DIR_SPACE = "/space";

  /** Number of namespaces. */
  private static final int NUM_NAMESPACES = 2;


  /** Mini HDFS clusters with Routers and State Store. */
  private static StateStoreDFSCluster cluster;
  /** Router for testing. */
  private static RouterContext routerContext;
  /** Router/federated filesystem. */
  private static FileSystem routerFs;
  /** Filesystem for each namespace. */
  private static List<FileSystem> nsFss = new LinkedList<>();


  @BeforeEach
  public void setup() throws Exception {
    // 2 nameservices with 1 namenode each (no HA needed for this test)
    cluster = new StateStoreDFSCluster(
        false, NUM_NAMESPACES, MultipleDestinationMountTableResolver.class);

    // Start NNs and DNs and wait until ready
    cluster.startCluster();

    // Build and start a Router with: State Store + Admin + RPC
    Configuration routerConf = new RouterConfigBuilder()
        .stateStore()
        .admin()
        .rpc()
        .build();
    cluster.addRouterOverrides(routerConf);
    cluster.startRouters();
    routerContext = cluster.getRandomRouter();

    // Register and verify all NNs with all routers
    cluster.registerNamenodes();
    cluster.waitNamenodeRegistration();

    // Setup the test mount point
    createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL);
    createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM);
    createMountTableEntry(TEST_DIR_SPACE, DestinationOrder.SPACE);

    // Get filesystems for federated and each namespace
    routerFs = routerContext.getFileSystem();
    for (String nsId : cluster.getNameservices()) {
      List<NamenodeContext> nns = cluster.getNamenodes(nsId);
      for (NamenodeContext nn : nns) {
        FileSystem nnFs = nn.getFileSystem();
        nsFss.add(nnFs);
      }
    }
    assertEquals(NUM_NAMESPACES, nsFss.size());
  }

  @AfterEach
  public void cleanup() {
    cluster.shutdown();
    cluster = null;
    routerContext = null;
    routerFs = null;
    nsFss.clear();
  }

  @Test
  public void testHashAll() throws Exception {
    testAll(TEST_DIR_HASH_ALL);
  }

  @Test
  public void testRandomAll() throws Exception {
    testAll(TEST_DIR_RANDOM);
  }

  @Test
  public void testSpaceAll() throws Exception {
    testAll(TEST_DIR_SPACE);
  }

  /**
   * Tests that the resolver spreads files across subclusters in the whole
   * tree.
   * @throws Exception If the resolver is not working.
   */
  private void testAll(final String path) throws Exception {

    // Create directories in different levels
    routerFs.mkdirs(new Path(path + "/dir0"));
    routerFs.mkdirs(new Path(path + "/dir1"));
    routerFs.mkdirs(new Path(path + "/dir2/dir20"));
    routerFs.mkdirs(new Path(path + "/dir2/dir21"));
    routerFs.mkdirs(new Path(path + "/dir2/dir22"));
    routerFs.mkdirs(new Path(path + "/dir2/dir22/dir220"));
    routerFs.mkdirs(new Path(path + "/dir2/dir22/dir221"));
    routerFs.mkdirs(new Path(path + "/dir2/dir22/dir222"));
    assertDirsEverywhere(path, 9);

    // Create 14 files at different levels of the tree
    createTestFile(routerFs, path + "/dir0/file1.txt");
    createTestFile(routerFs, path + "/dir0/file2.txt");
    createTestFile(routerFs, path + "/dir1/file2.txt");
    createTestFile(routerFs, path + "/dir1/file3.txt");
    createTestFile(routerFs, path + "/dir2/dir20/file4.txt");
    createTestFile(routerFs, path + "/dir2/dir20/file5.txt");
    createTestFile(routerFs, path + "/dir2/dir21/file6.txt");
    createTestFile(routerFs, path + "/dir2/dir21/file7.txt");
    createTestFile(routerFs, path + "/dir2/dir22/file8.txt");
    createTestFile(routerFs, path + "/dir2/dir22/file9.txt");
    createTestFile(routerFs, path + "/dir2/dir22/dir220/file10.txt");
    createTestFile(routerFs, path + "/dir2/dir22/dir220/file11.txt");
    createTestFile(routerFs, path + "/dir2/dir22/dir220/file12.txt");
    createTestFile(routerFs, path + "/dir2/dir22/dir220/file13.txt");
    assertDirsEverywhere(path, 9);
    assertFilesDistributed(path, 14);

    // Test append
    String testFile = path + "/dir2/dir22/dir220/file-append.txt";
    createTestFile(routerFs, testFile);
    Path testFilePath = new Path(testFile);
    assertTrue(routerFs.getFileStatus(testFilePath).getLen() > 50, "Created file is too small");
    appendTestFile(routerFs, testFile);
    assertTrue(routerFs.getFileStatus(testFilePath).getLen() > 110, "Append file is too small");
    assertDirsEverywhere(path, 9);
    assertFilesDistributed(path, 15);

    // Test truncate
    String testTruncateFile = path + "/dir2/dir22/dir220/file-truncate.txt";
    createTestFile(routerFs, testTruncateFile);
    Path testTruncateFilePath = new Path(testTruncateFile);
    routerFs.truncate(testTruncateFilePath, 10);
    TestFileTruncate.checkBlockRecovery(testTruncateFilePath,
        (DistributedFileSystem) routerFs);
    assertEquals(10, routerFs.getFileStatus(testTruncateFilePath).getLen(), "Truncate file fails");
    assertDirsEverywhere(path, 9);
    assertFilesDistributed(path, 16);

    // Removing a directory should remove it from every subcluster
    routerFs.delete(new Path(path + "/dir2/dir22/dir220"), true);
    assertDirsEverywhere(path, 8);
    assertFilesDistributed(path, 10);

    // Removing all sub directories
    routerFs.delete(new Path(path + "/dir0"), true);
    routerFs.delete(new Path(path + "/dir1"), true);
    routerFs.delete(new Path(path + "/dir2"), true);
    assertDirsEverywhere(path, 0);
    assertFilesDistributed(path, 0);
  }

  /**
   * Directories in HASH_ALL mount points must be in every namespace.
   * @param path Path to check under.
   * @param expectedNumDirs Expected number of directories.
   * @throws IOException If it cannot check the directories.
   */
  private void assertDirsEverywhere(String path, int expectedNumDirs)
      throws IOException {

    // Check for the directories in each filesystem
    List<FileStatus> files = listRecursive(routerFs, path);
    int numDirs = 0;
    for (FileStatus file : files) {
      if (file.isDirectory()) {
        numDirs++;

        Path dirPath = file.getPath();
        Path checkPath = getRelativePath(dirPath);
        for (FileSystem nsFs : nsFss) {
          FileStatus fileStatus1 = nsFs.getFileStatus(checkPath);
          assertTrue(fileStatus1.isDirectory(), file + " should be a directory");
        }
      }
    }
    assertEquals(expectedNumDirs, numDirs);
  }

  /**
   * Check that the files are somewhat spread across namespaces.
   * @param path Path to check under.
   * @param expectedNumFiles Number of files expected.
   * @throws IOException If the files cannot be checked.
   */
  private void assertFilesDistributed(String path, int expectedNumFiles)
      throws IOException {

    // Check where the files went
    List<FileStatus> routerFiles = listRecursive(routerFs, path);
    List<List<FileStatus>> nssFiles = new LinkedList<>();
    for (FileSystem nsFs : nsFss) {
      List<FileStatus> nsFiles = listRecursive(nsFs, path);
      nssFiles.add(nsFiles);
    }

    // We should see all the files in the federated view
    int numRouterFiles = getNumTxtFiles(routerFiles);
    assertEquals(numRouterFiles, expectedNumFiles);

    // All the files should be spread somewhat evenly across subclusters
    List<Integer> numNsFiles = new LinkedList<>();
    int sumNsFiles = 0;
    for (int i = 0; i < NUM_NAMESPACES; i++) {
      List<FileStatus> nsFiles = nssFiles.get(i);
      int numFiles = getNumTxtFiles(nsFiles);
      numNsFiles.add(numFiles);
      sumNsFiles += numFiles;
    }
    assertEquals(numRouterFiles, sumNsFiles);
    if (expectedNumFiles > 0) {
      for (int numFiles : numNsFiles) {
        assertTrue(numFiles > 0, "Files not distributed: " + numNsFiles);
      }
    }
  }

  /**
   * Create a test file in the filesystem and check if it was written.
   * @param fs Filesystem.
   * @param filename Name of the file to create.
   * @throws IOException If it cannot create the file.
   */
  private static void createTestFile(
      final FileSystem fs, final String filename)throws IOException {

    final Path path = new Path(filename);

    // Write the data
    FSDataOutputStream os = fs.create(path);
    os.writeUTF("Test data " + filename);
    os.close();

    // Read the data and check
    FSDataInputStream is = fs.open(path);
    String read = is.readUTF();
    assertEquals("Test data " + filename, read);
    is.close();
  }

  /**
   * Append to a test file in the filesystem and check if we appended.
   * @param fs Filesystem.
   * @param filename Name of the file to append to.
   * @throws IOException
   */
  private static void appendTestFile(
      final FileSystem fs, final String filename) throws IOException {
    final Path path = new Path(filename);

    // Write the data
    FSDataOutputStream os = fs.append(path);
    os.writeUTF("Test append data " + filename);
    os.close();

    // Read the data previous data
    FSDataInputStream is = fs.open(path);
    String read = is.readUTF();
    assertEquals(read, "Test data " + filename);
    // Read the new data and check
    read = is.readUTF();
    assertEquals(read, "Test append data " + filename);
    is.close();
  }

  /**
   * Count the number of text files in a list.
   * @param files File list.
   * @return Number of .txt files.
   */
  private static int getNumTxtFiles(final List<FileStatus> files) {
    int numFiles = 0;
    for (FileStatus file : files) {
      if (file.getPath().getName().endsWith(".txt")) {
        numFiles++;
      }
    }
    return numFiles;
  }

  /**
   * Get the relative path within a filesystem (removes the filesystem prefix).
   * @param path Path to check.
   * @return File within the filesystem.
   */
  private static Path getRelativePath(final Path path) {
    URI uri = path.toUri();
    String uriPath = uri.getPath();
    return new Path(uriPath);
  }

  /**
   * Get the list the files/dirs under a path.
   * @param fs Filesystem to check in.
   * @param path Path to check for.
   * @return List of files.
   * @throws IOException If it cannot list the files.
   */
  private List<FileStatus> listRecursive(
      final FileSystem fs, final String path) throws IOException {
    List<FileStatus> ret = new LinkedList<>();
    List<Path> temp = new LinkedList<>();
    temp.add(new Path(path));
    while (!temp.isEmpty()) {
      Path p = temp.remove(0);
      for (FileStatus fileStatus : fs.listStatus(p)) {
        ret.add(fileStatus);
        if (fileStatus.isDirectory()) {
          temp.add(fileStatus.getPath());
        }
      }
    }
    return ret;
  }

  /**
   * Add a mount table entry in all nameservices and wait until it is
   * available in all routers.
   * @param mountPoint Name of the mount point.
   * @param order Order of the mount table entry.
   * @throws Exception If the entry could not be created.
   */
  private void createMountTableEntry(
      final String mountPoint, final DestinationOrder order) throws Exception {

    RouterClient admin = routerContext.getAdminClient();
    MountTableManager mountTable = admin.getMountTableManager();
    Map<String, String> destMap = new HashMap<>();
    for (String nsId : cluster.getNameservices()) {
      destMap.put(nsId, mountPoint);
    }
    MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
    newEntry.setDestOrder(order);
    AddMountTableEntryRequest addRequest =
        AddMountTableEntryRequest.newInstance(newEntry);
    AddMountTableEntryResponse addResponse =
        mountTable.addMountTableEntry(addRequest);
    boolean created = addResponse.getStatus();
    assertTrue(created);

    // Refresh the caches to get the mount table
    Router router = routerContext.getRouter();
    StateStoreService stateStore = router.getStateStore();
    stateStore.refreshCaches(true);

    // Check for the path
    GetMountTableEntriesRequest getRequest =
        GetMountTableEntriesRequest.newInstance(mountPoint);
    GetMountTableEntriesResponse getResponse =
        mountTable.getMountTableEntries(getRequest);
    List<MountTable> entries = getResponse.getEntries();
    assertEquals(1, entries.size());
    assertEquals(mountPoint, entries.get(0).getSourcePath());
  }
}