TestViewFileSystemOverloadSchemeWithHdfsScheme.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.viewfs;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;


/**
 * Tests ViewFileSystemOverloadScheme with configured mount links.
 */
public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
  private static final String TEST_STRING = "Hello ViewFSOverloadedScheme!";
  private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
  private static final String HDFS_SCHEME = "hdfs";
  private Configuration conf = null;
  private static MiniDFSCluster cluster = null;
  private URI defaultFSURI;
  private File localTargetDir;
  private static final String TEST_ROOT_DIR = PathUtils
      .getTestDirName(TestViewFileSystemOverloadSchemeWithHdfsScheme.class);
  private static final String HDFS_USER_FOLDER = "/HDFSUser";
  private static final String LOCAL_FOLDER = "/local";

  @BeforeAll
  public static void init() throws IOException {
    cluster =
        new MiniDFSCluster.Builder(new Configuration()).numDataNodes(2).build();
    cluster.waitClusterUp();
  }

  /**
   * Sets up the configurations and starts the MiniDFSCluster.
   */
  @BeforeEach
  public void setUp() throws IOException {
    Configuration config = getNewConf();
    config.setInt(
        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
    config.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
        ViewFileSystemOverloadScheme.class.getName());
    config.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
        CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT);
    setConf(config);
    defaultFSURI =
        URI.create(config.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
    localTargetDir = new File(TEST_ROOT_DIR, "/root/");
    localTargetDir.mkdirs();
    assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme.
  }

  @AfterEach
  public void cleanUp() throws IOException {
    if (cluster != null) {
      FileSystem fs = new DistributedFileSystem();
      fs.initialize(defaultFSURI, conf);
      try {
        FileStatus[] statuses = fs.listStatus(new Path("/"));
        for (FileStatus st : statuses) {
          assertTrue(fs.delete(st.getPath(), true));
        }
      } finally {
        fs.close();
      }
      FileSystem.closeAll();
    }
  }

  @AfterAll
  public static void tearDown() throws IOException {
    if (cluster != null) {
      FileSystem.closeAll();
      cluster.shutdown();
    }
  }

  /**
   * Adds the given mount links to config. sources contains mount link src and
   * the respective index location in targets contains the target uri.
   */
  void addMountLinks(String mountTable, String[] sources, String[] targets,
      Configuration config) throws IOException, URISyntaxException {
    ViewFsTestSetup.addMountLinksToConf(mountTable, sources, targets, config);
  }

  /**
   * Create mount links as follows.
   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
   *
   * create file /HDFSUser/testfile should create in hdfs
   * create file /local/test should create directory in local fs
   */
  @Test
  @Timeout(value = 30)
  public void testMountLinkWithLocalAndHDFS() throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    final Path localTragetPath = new Path(localTargetDir.toURI());

    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
        new String[] {hdfsTargetPath.toUri().toString(),
            localTargetDir.toURI().toString() },
        conf);

    // /HDFSUser/testfile
    Path hdfsFile = new Path(HDFS_USER_FOLDER + "/testfile");
    // /local/test
    Path localDir = new Path(LOCAL_FOLDER + "/test");

    try (FileSystem fs
        =  FileSystem.get(conf)) {
      assertEquals(2, fs.getChildFileSystems().length);
      fs.createNewFile(hdfsFile); // /HDFSUser/testfile
      fs.mkdirs(localDir); // /local/test
    }

    // Initialize HDFS and test files exist in ls or not
    try (DistributedFileSystem dfs = new DistributedFileSystem()) {
      dfs.initialize(defaultFSURI, conf);
      assertTrue(dfs.exists(
          new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath),
              hdfsFile.getName()))); // should be in hdfs.
      assertFalse(dfs.exists(
          new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath),
              localDir.getName()))); // should not be in local fs.
    }

    try (RawLocalFileSystem lfs = new RawLocalFileSystem()) {
      lfs.initialize(localTragetPath.toUri(), conf);
      assertFalse(lfs.exists(
          new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath),
              hdfsFile.getName()))); // should not be in hdfs.
      assertTrue(lfs.exists(
          new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath),
              localDir.getName()))); // should be in local fs.
    }
  }

  /**
   * Create mount links as follows.
   * hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/
   * It should fail to add non existent fs link.
   */
  @Test
  @Timeout(value = 30)
  public void testMountLinkWithNonExistentLink() throws Exception {
    testMountLinkWithNonExistentLink(true);
  }

  public void testMountLinkWithNonExistentLink(boolean expectFsInitFailure)
      throws Exception {
    final String userFolder = "/User";
    final Path nonExistTargetPath =
        new Path("nonexistent://NonExistent" + userFolder);

    /**
     * Below addLink will create following mount points
     * hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/
     */
    addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder},
        new String[] {nonExistTargetPath.toUri().toString()}, conf);
    if (expectFsInitFailure) {
      LambdaTestUtils.intercept(IOException.class, () -> {
        FileSystem fs = FileSystem.get(conf);
        fs.resolvePath(new Path(userFolder));
      });
    } else {
      try (FileSystem fs = FileSystem.get(conf)) {
        assertEquals("hdfs", fs.getScheme());
      }
    }
  }

  /**
   * Create mount links as follows.
   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
   * ListStatus on / should list the mount links.
   */
  @Test
  @Timeout(value = 30)
  public void testListStatusOnRootShouldListAllMountLinks() throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
        new String[] {hdfsTargetPath.toUri().toString(),
            localTargetDir.toURI().toString() },
        conf);
    try (DistributedFileSystem dfs = new DistributedFileSystem()) {
      dfs.initialize(defaultFSURI, conf);
      dfs.mkdirs(hdfsTargetPath);
    }

    try (RawLocalFileSystem lfs = new RawLocalFileSystem()) {
      lfs.initialize(localTargetDir.toURI(), conf);
      lfs.mkdirs(new Path(localTargetDir.toURI()));
    }
    try (FileSystem fs = FileSystem.get(conf)) {
      fs.mkdirs(hdfsTargetPath);
      FileStatus[] ls = fs.listStatus(new Path("/"));
      assertEquals(2, ls.length);
      String lsPath1 =
          Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).toString();
      String lsPath2 =
          Path.getPathWithoutSchemeAndAuthority(ls[1].getPath()).toString();
      assertTrue(HDFS_USER_FOLDER.equals(lsPath1) || LOCAL_FOLDER.equals(lsPath1));
      assertTrue(HDFS_USER_FOLDER.equals(lsPath2) || LOCAL_FOLDER.equals(lsPath2));
    }
  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
   * ListStatus non mount directory should fail.
   */
  @Test
  @Timeout(value = 30)
  public void testListStatusOnNonMountedPath() throws Exception {
    assertThrows(IOException.class, () -> {
      final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
      addMountLinks(defaultFSURI.getAuthority(),
          new String[]{HDFS_USER_FOLDER, LOCAL_FOLDER},
          new String[]{hdfsTargetPath.toUri().toString(),
              localTargetDir.toURI().toString()},
          conf);
      try (FileSystem fs = FileSystem.get(conf)) {
        fs.listStatus(new Path("/nonMount"));
        fail("It should fail as no mount link with /nonMount");
      }
    });
  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
   * Check that "viewfs:/" paths without authority can work when the
   * default mount table name is set correctly.
   */
  @Test
  public void testAccessViewFsPathWithoutAuthority() throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
        new String[] {hdfsTargetPath.toUri().toString(),
            localTargetDir.toURI().toString() },
        conf);

    // /HDFSUser/test
    Path hdfsDir = new Path(HDFS_USER_FOLDER, "test");
    // /local/test
    Path localDir = new Path(LOCAL_FOLDER, "test");
    FileStatus[] expectedStatus;

    try (FileSystem fs = FileSystem.get(conf)) {
      fs.mkdirs(hdfsDir); // /HDFSUser/test
      fs.mkdirs(localDir); // /local/test
      expectedStatus = fs.listStatus(new Path("/"));
    }

    // check for viewfs path without authority
    Path viewFsRootPath = new Path("viewfs:/");
    LambdaTestUtils.intercept(IOException.class,
        "Empty Mount table in config for viewfs://default", () -> {
          viewFsRootPath.getFileSystem(conf);
        });

    // set the name of the default mount table here and
    // subsequent calls should succeed.
    conf.set(Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE_NAME_KEY,
        defaultFSURI.getAuthority());

    try (FileSystem fs = viewFsRootPath.getFileSystem(conf)) {
      FileStatus[] status = fs.listStatus(viewFsRootPath);
      // compare only the final components of the paths as
      // full paths have different schemes (hdfs:/ vs. viewfs:/).
      List<String> expectedPaths = Arrays.stream(expectedStatus)
          .map(s -> s.getPath().getName()).sorted()
          .collect(Collectors.toList());
      List<String> paths = Arrays.stream(status)
          .map(s -> s.getPath().getName()).sorted()
          .collect(Collectors.toList());
      assertEquals(expectedPaths, paths);
    }
  }

  /**
   * Create mount links as follows hdfs://localhost:xxx/HDFSUser -->
   * hdfs://localhost:xxx/HDFSUser/ hdfs://localhost:xxx/local -->
   * file://TEST_ROOT_DIR/root/ fallback --> hdfs://localhost:xxx/HDFSUser/
   * Creating file or directory at non root level should succeed with fallback
   * links.
   */
  @Test
  @Timeout(value = 30)
  public void testWithLinkFallBack() throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
            Constants.CONFIG_VIEWFS_LINK_FALLBACK },
        new String[] {hdfsTargetPath.toUri().toString(),
            localTargetDir.toURI().toString(),
            hdfsTargetPath.toUri().toString() },
        conf);

    try (FileSystem fs = FileSystem.get(conf)) {
      fs.createNewFile(new Path("/nonMount/myfile"));
      FileStatus[] ls = fs.listStatus(new Path("/nonMount"));
      assertEquals(1, ls.length);
      assertEquals(Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).getName(), "myfile");
    }
  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
   *
   * It cannot find any mount link. ViewFS expects a mount point from root.
   */
  @Test
  @Timeout(value = 30)
  public void testCreateOnRoot() throws Exception {
    testCreateOnRoot(false);
  }

  public void testCreateOnRoot(boolean fallbackExist) throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER},
        new String[] {hdfsTargetPath.toUri().toString(),
            localTargetDir.toURI().toString()}, conf);
    try (FileSystem fs = FileSystem.get(conf)) {
      if (fallbackExist) {
        assertTrue(fs.createNewFile(new Path("/newFileOnRoot")));
      } else {
        LambdaTestUtils.intercept(NotInMountpointException.class, () -> {
          fs.createNewFile(new Path("/newFileOnRoot"));
        });
      }
    }
  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
   * fallback --> hdfs://localhost:xxx/HDFSUser/
   *
   * Note: Above links created because to make fs initialization success.
   * Otherwise will not proceed if no mount links.
   *
   * Unset fs.viewfs.overload.scheme.target.hdfs.impl property.
   * So, OverloadScheme target fs initialization will fail.
   */
  @Test
  @Timeout(value = 30)
  public void testInvalidOverloadSchemeTargetFS() throws Exception {
    assertThrows(IOException.class, () -> {
      final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
      String mountTableIfSet = conf.get(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH);
      conf = new Configuration();
      if (mountTableIfSet != null) {
        conf.set(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH, mountTableIfSet);
      }
      addMountLinks(defaultFSURI.getHost(),
          new String[]{HDFS_USER_FOLDER, LOCAL_FOLDER,
              Constants.CONFIG_VIEWFS_LINK_FALLBACK},
          new String[]{hdfsTargetPath.toUri().toString(),
              localTargetDir.toURI().toString(),
              hdfsTargetPath.toUri().toString()},
          conf);
      conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
          defaultFSURI.toString());
      conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
          ViewFileSystemOverloadScheme.class.getName());
      conf.unset(String.format(
          FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
          HDFS_SCHEME));
      try (FileSystem fs = FileSystem.get(conf)) {
        fs.createNewFile(new Path("/onRootWhenFallBack"));
        fail("OverloadScheme target fs should be valid.");
      }
    });

  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
   *
   * It should be able to create file using ViewFileSystemOverloadScheme.
   */
  @Test
  @Timeout(value = 30)
  public void testViewFsOverloadSchemeWhenInnerCacheDisabled()
      throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
        new String[] {hdfsTargetPath.toUri().toString(),
            localTargetDir.toURI().toString(), },
        conf);
    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
    try (FileSystem fs = FileSystem.get(conf)) {
      Path testFile = new Path(HDFS_USER_FOLDER + "/testFile");
      fs.createNewFile(testFile);
      assertTrue(fs.exists(testFile));
    }
  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/
   *
   * 1. With cache, only one hdfs child file system instance should be there.
   * 2. Without cache, there should 2 hdfs instances.
   */
  @Test
  @Timeout(value = 30)
  public void testViewFsOverloadSchemeWithInnerCache()
      throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER + 0, HDFS_USER_FOLDER + 1 },
        new String[] {hdfsTargetPath.toUri().toString(),
            hdfsTargetPath.toUri().toString() },
        conf);

    // 1. Only 1 hdfs child file system should be there with cache.
    try (FileSystem vfs = FileSystem.get(conf)) {
      assertEquals(1, vfs.getChildFileSystems().length);
    }

    // 2. Two hdfs file systems should be there if no cache.
    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
    try (FileSystem vfs = FileSystem.get(conf)) {
      assertEquals(isFallBackExist(conf) ? 3 : 2,
          vfs.getChildFileSystems().length);
    }
  }

  // HDFS-15529: if any extended tests added fallback, then getChildFileSystems
  // will include fallback as well.
  private boolean isFallBackExist(Configuration config) {
    return config.get(ConfigUtil.getConfigViewFsPrefix(defaultFSURI
        .getAuthority()) + "." + Constants.CONFIG_VIEWFS_LINK_FALLBACK) != null;
  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/
   * hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/
   *
   * When InnerCache disabled, all matching ViewFileSystemOverloadScheme
   * initialized scheme file systems would not use FileSystem cache.
   */
  @Test
  @Timeout(value = 30)
  public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets()
      throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {HDFS_USER_FOLDER + 0, HDFS_USER_FOLDER + 1 },
        new String[] {hdfsTargetPath.toUri().toString(),
            hdfsTargetPath.toUri().toString() },
        conf);

    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
    // Two hdfs file systems should be there if no cache.
    try (FileSystem vfs = FileSystem.get(conf)) {
      assertEquals(isFallBackExist(conf) ? 3 : 2,
          vfs.getChildFileSystems().length);
    }
  }

  /**
   * Create mount links as follows
   * hdfs://localhost:xxx/local0 --> file://localPath/
   * hdfs://localhost:xxx/local1 --> file://localPath/
   *
   * When InnerCache disabled, all non matching ViewFileSystemOverloadScheme
   * initialized scheme file systems should continue to take advantage of
   * FileSystem cache.
   */
  @Test
  @Timeout(value = 30)
  public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
      throws Exception {
    final Path localTragetPath = new Path(localTargetDir.toURI());
    addMountLinks(defaultFSURI.getAuthority(),
        new String[] {LOCAL_FOLDER + 0, LOCAL_FOLDER + 1 },
        new String[] {localTragetPath.toUri().toString(),
            localTragetPath.toUri().toString() },
        conf);

    // Only one local file system should be there if no InnerCache, but fs
    // cache should work.
    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
    try (FileSystem vfs = FileSystem.get(conf)) {
      assertEquals(isFallBackExist(conf) ? 2 : 1,
          vfs.getChildFileSystems().length);
    }
  }

  /**
   * Tests the rename with nfly mount link.
   */
  @Test
  @Timeout(value = 30)
  public void testNflyRename() throws Exception {
    final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
    final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
    final URI uri1 = hdfsTargetPath1.toUri();
    final URI uri2 = hdfsTargetPath2.toUri();
    final Path nflyRoot = new Path("/nflyroot");

    final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
        + ".minReplication=2." + nflyRoot.toString();
    addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
        new String[] {uri1.toString() + "," + uri2.toString() }, conf);
    final FileSystem nfly = FileSystem.get(defaultFSURI, conf);

    final Path testDir = new Path("/nflyroot/testdir1/sub1/sub3");
    final Path testDirTmp = new Path("/nflyroot/testdir1/sub1/sub3_temp");
    assertTrue(nfly.mkdirs(testDir), testDir + ": Failed to create!");

    // Test renames
    assertTrue(nfly.rename(testDir, testDirTmp));
    assertTrue(nfly.rename(testDirTmp, testDir));

    final URI[] testUris = new URI[] {uri1, uri2 };
    for (final URI testUri : testUris) {
      final FileSystem fs = FileSystem.get(testUri, conf);
      assertTrue(fs.exists(testDir), testDir + " should exist!");
    }
  }

  /**
   * Tests the write and read contents with nfly mount link.
   */
  @Test
  @Timeout(value = 30)
  public void testNflyWriteRead() throws Exception {
    final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
    final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
    final URI uri1 = hdfsTargetPath1.toUri();
    final URI uri2 = hdfsTargetPath2.toUri();
    final Path nflyRoot = new Path("/nflyroot");
    final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
        + ".minReplication=2." + nflyRoot.toString();
    addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
        new String[] {uri1.toString() + "," + uri2.toString() }, conf);
    final FileSystem nfly = FileSystem.get(defaultFSURI, conf);
    final Path testFile = new Path("/nflyroot/test.txt");
    writeString(nfly, TEST_STRING, testFile);
    final URI[] testUris = new URI[] {uri1, uri2 };
    for (final URI testUri : testUris) {
      try (FileSystem fs = FileSystem.get(testUri, conf)) {
        readString(fs, testFile, TEST_STRING, testUri);
      }
    }
  }

  /**
   * 1. Writes contents with nfly link having two target uris. 2. Deletes one
   * target file. 3. Tests the read works with repairOnRead flag. 4. Tests that
   * previously deleted file fully recovered and exists.
   */
  @Test
  @Timeout(value = 30)
  public void testNflyRepair() throws Exception {
    final NflyFSystem.NflyKey repairKey = NflyFSystem.NflyKey.repairOnRead;
    final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
    final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
    final URI uri1 = hdfsTargetPath1.toUri();
    final URI uri2 = hdfsTargetPath2.toUri();
    final Path nflyRoot = new Path("/nflyroot");
    final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
        + ".minReplication=2," + repairKey + "=true." + nflyRoot.toString();
    addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
        new String[] {uri1.toString() + "," + uri2.toString() }, conf);
    try (FileSystem nfly = FileSystem.get(defaultFSURI, conf)) {
      // write contents to nfly
      final Path testFilePath = new Path("/nflyroot/test.txt");
      writeString(nfly, TEST_STRING, testFilePath);

      final URI[] testUris = new URI[] {uri1, uri2 };
      // both nodes are up again, test repair
      FsGetter getter = new ViewFileSystemOverloadScheme.ChildFsGetter("hdfs");
      try (FileSystem fs1 = getter.getNewInstance(testUris[0], conf)) {
        // Delete a file from one target URI
        String testFile = "/test.txt";
        assertTrue(
            fs1.delete(new Path(testUris[0].toString() + testFile), false));
        assertFalse(fs1.exists(new Path(testUris[0].toString() + testFile)));

        // Verify read success.
        readString(nfly, testFilePath, TEST_STRING, testUris[0]);
        // Verify file recovered.
        assertTrue(fs1.exists(new Path(testUris[0].toString() + testFile)));
      }
    }
  }

  /**
   * Tests that the fs initialization should ignore the port number when it's
   * extracting the mount table name from uri.
   */
  @Test
  @Timeout(value = 30)
  public void testMountTableNameShouldIgnorePortFromURI() throws Exception {
    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
    conf = new Configuration(getConf());
    addMountLinks(defaultFSURI.getHost(),
        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
            Constants.CONFIG_VIEWFS_LINK_FALLBACK},
        new String[] {hdfsTargetPath.toUri().toString(),
            localTargetDir.toURI().toString(),
            hdfsTargetPath.toUri().toString()}, conf);
    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
        defaultFSURI.toString());
    conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
        ViewFileSystemOverloadScheme.class.getName());
    conf.set(String
        .format(FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
            HDFS_SCHEME), DistributedFileSystem.class.getName());
    conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME, true);

    Path testDirOnRoot = new Path("/test");
    URI uriWithoutPort = new URI("hdfs://" + defaultFSURI.getHost());
    //Initialize with out port
    try (FileSystem fs = FileSystem
        .get(uriWithoutPort, conf)) {
      fs.mkdirs(testDirOnRoot);
      fs.delete(testDirOnRoot, true);
    }

    //Initialize with port
    try (FileSystem fs = FileSystem.get(defaultFSURI, conf)) {
      fs.mkdirs(testDirOnRoot);
      fs.delete(testDirOnRoot, true);
    }
  }

  private void writeString(final FileSystem nfly, final String testString,
      final Path testFile) throws IOException {
    try (FSDataOutputStream fsDos = nfly.create(testFile)) {
      fsDos.writeUTF(testString);
    }
  }

  private void readString(final FileSystem nfly, final Path testFile,
      final String testString, final URI testUri) throws IOException {
    try (FSDataInputStream fsDis = nfly.open(testFile)) {
      assertEquals(testString, fsDis.readUTF(), "Wrong file content");
    }
  }

  /**
   * @return configuration.
   */
  public Configuration getConf() {
    return this.conf;
  }

  /**
   * @return configuration.
   */
  public Configuration getNewConf() {
    return new Configuration(cluster.getConfiguration(0));
  }

  /**
   * sets configuration.
   */
  public void setConf(Configuration config) {
    conf = config;
  }
}