BaseTestHttpFSWith.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.http.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.http.server.HttpFSAuthenticationFilter;
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.test.HFSTestCase;
import org.apache.hadoop.test.HadoopUsersConfTestHelper;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.TestDir;
import org.apache.hadoop.test.TestDirHelper;
import org.apache.hadoop.test.TestHdfs;
import org.apache.hadoop.test.TestHdfsHelper;
import org.apache.hadoop.test.TestJetty;
import org.apache.hadoop.test.TestJettyHelper;
import org.apache.hadoop.util.Lists;

import org.json.simple.JSONObject;
import org.json.simple.parser.ContainerFactory;
import org.json.simple.parser.JSONParser;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.webapp.WebAppContext;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

@RunWith(value = Parameterized.class)
public abstract class BaseTestHttpFSWith extends HFSTestCase {
  protected abstract Path getProxiedFSTestDir();

  protected abstract String getProxiedFSURI();

  protected abstract Configuration getProxiedFSConf();

  protected boolean isLocalFS() {
    return getProxiedFSURI().startsWith("file://");
  }

  private void createHttpFSServer() throws Exception {
    File homeDir = TestDirHelper.getTestDir();
    assertTrue(new File(homeDir, "conf").mkdir());
    assertTrue(new File(homeDir, "log").mkdir());
    assertTrue(new File(homeDir, "temp").mkdir());
    HttpFSServerWebApp.setHomeDirForCurrentThread(homeDir.getAbsolutePath());

    File secretFile = new File(new File(homeDir, "conf"), "secret");
    Writer w = new FileWriter(secretFile);
    w.write("secret");
    w.close();

    //FileSystem being served by HttpFS
    String fsDefaultName = getProxiedFSURI();
    Configuration conf = new Configuration(false);
    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
    // For BaseTestHttpFSWith#testFileAclsCustomizedUserAndGroupNames
    conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
        "^[A-Za-z0-9_][A-Za-z0-9._-]*[$]?$");
    conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_KEY,
        "^(default:)?(user|group|mask|other):" +
            "[[0-9A-Za-z_][@A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?" +
            "(user|group|mask|other):[[0-9A-Za-z_][@A-Za-z0-9._-]]*:" +
            "([rwx-]{3})?)*$");
    File hdfsSite = new File(new File(homeDir, "conf"), "hdfs-site.xml");
    OutputStream os = new FileOutputStream(hdfsSite);
    conf.writeXml(os);
    os.close();

    //HTTPFS configuration
    conf = new Configuration(false);
    conf.set("httpfs.proxyuser." + HadoopUsersConfTestHelper.getHadoopProxyUser() + ".groups",
             HadoopUsersConfTestHelper.getHadoopProxyUserGroups());
    conf.set("httpfs.proxyuser." + HadoopUsersConfTestHelper.getHadoopProxyUser() + ".hosts",
             HadoopUsersConfTestHelper.getHadoopProxyUserHosts());
    conf.set(HttpFSAuthenticationFilter.HADOOP_HTTP_CONF_PREFIX +
        AuthenticationFilter.SIGNATURE_SECRET_FILE, secretFile.getAbsolutePath());
    File httpfsSite = new File(new File(homeDir, "conf"), "httpfs-site.xml");
    os = new FileOutputStream(httpfsSite);
    conf.writeXml(os);
    os.close();

    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    URL url = cl.getResource("webapp");
    WebAppContext context = new WebAppContext(url.getPath(), "/webhdfs");
    Server server = TestJettyHelper.getJettyServer();
    server.setHandler(context);
    server.start();
  }

  protected Class getFileSystemClass() {
    return HttpFSFileSystem.class;
  }

  protected String getScheme() {
    return "webhdfs";
  }

  protected FileSystem getHttpFSFileSystem(Configuration conf) throws
      Exception {
    conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
    URI uri = new URI(getScheme() + "://" +
                      TestJettyHelper.getJettyURL().toURI().getAuthority());
    return FileSystem.get(uri, conf);
  }

  protected FileSystem getHttpFSFileSystem() throws Exception {
    Configuration conf = new Configuration();
    return getHttpFSFileSystem(conf);
  }

  protected void testGet() throws Exception {
    FileSystem fs = getHttpFSFileSystem();
    assertNotNull(fs);
    URI uri = new URI(getScheme() + "://" +
                      TestJettyHelper.getJettyURL().toURI().getAuthority());
    assertEquals(fs.getUri(), uri);
    fs.close();
  }

  private void testOpen() throws Exception {
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    Path path = new Path(getProxiedFSTestDir(), "foo.txt");
    OutputStream os = fs.create(path);
    os.write(1);
    os.close();
    fs.close();
    fs = getHttpFSFileSystem();
    InputStream is = fs.open(new Path(path.toUri().getPath()));
    assertEquals(is.read(), 1);
    is.close();
    fs.close();
  }

  private void testCreate(Path path, boolean override) throws Exception {
    FileSystem fs = getHttpFSFileSystem();
    FsPermission permission = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE);
    OutputStream os = fs.create(new Path(path.toUri().getPath()), permission, override, 1024,
                                (short) 2, 100 * 1024 * 1024, null);
    os.write(1);
    os.close();
    fs.close();

    fs = FileSystem.get(getProxiedFSConf());
    FileStatus status = fs.getFileStatus(path);
    if (!isLocalFS()) {
      assertEquals(status.getReplication(), 2);
      assertEquals(status.getBlockSize(), 100 * 1024 * 1024);
    }
    assertEquals(status.getPermission(), permission);
    InputStream is = fs.open(path);
    assertEquals(is.read(), 1);
    is.close();
    fs.close();
  }

  private void testCreate() throws Exception {
    Path path = new Path(getProxiedFSTestDir(), "foo.txt");
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    fs.delete(path, true);
    testCreate(path, false);
    testCreate(path, true);
    try {
      testCreate(path, false);
      Assert.fail("the create should have failed because the file exists " +
                  "and override is FALSE");
    } catch (IOException ex) {
      System.out.println("#");
    } catch (Exception ex) {
      Assert.fail(ex.toString());
    }
  }

  private void testAppend() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      fs.close();
      fs = getHttpFSFileSystem();
      os = fs.append(new Path(path.toUri().getPath()));
      os.write(2);
      os.close();
      fs.close();
      fs = FileSystem.get(getProxiedFSConf());
      InputStream is = fs.open(path);
      assertEquals(is.read(), 1);
      assertEquals(is.read(), 2);
      assertEquals(is.read(), -1);
      is.close();
      fs.close();
    }
  }

  private void testTruncate() throws Exception {
    if (!isLocalFS()) {
      final short repl = 3;
      final int blockSize = 1024;
      final int numOfBlocks = 2;
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path file = new Path(getProxiedFSTestDir(), "foo.txt");
      final byte[] data = FileSystemTestHelper.getFileData(
          numOfBlocks, blockSize);
      FileSystemTestHelper.createFile(fs, file, data, blockSize, repl);

      final int newLength = blockSize;

      boolean isReady = fs.truncate(file, newLength);
      assertTrue("Recovery is not expected.", isReady);

      FileStatus fileStatus = fs.getFileStatus(file);
      assertEquals(fileStatus.getLen(), newLength);
      AppendTestUtil.checkFullFile(fs, file, newLength, data, file.toString());

      fs.close();
      assertPathCapabilityForTruncate(file);
    }
  }

  private void assertPathCapabilityForTruncate(Path file) throws Exception {
    FileSystem fs = this.getHttpFSFileSystem();
    assertTrue("HttpFS/WebHdfs/SWebHdfs support truncate",
        fs.hasPathCapability(file, CommonPathCapabilities.FS_TRUNCATE));
    fs.close();
  }

  private void testConcat() throws Exception {
    Configuration config = getProxiedFSConf();
    config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(config);
      fs.mkdirs(getProxiedFSTestDir());
      Path path1 = new Path("/test/foo.txt");
      Path path2 = new Path("/test/bar.txt");
      Path path3 = new Path("/test/derp.txt");
      DFSTestUtil.createFile(fs, path1, 1024, (short) 3, 0);
      DFSTestUtil.createFile(fs, path2, 1024, (short) 3, 0);
      DFSTestUtil.createFile(fs, path3, 1024, (short) 3, 0);
      fs.close();
      fs = getHttpFSFileSystem();
      fs.concat(path1, new Path[]{path2, path3});
      fs.close();
      fs = FileSystem.get(config);
      assertTrue(fs.exists(path1));
      assertFalse(fs.exists(path2));
      assertFalse(fs.exists(path3));
      fs.close();
    }
  }

  private void testRename() throws Exception {
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    Path path = new Path(getProxiedFSTestDir(), "foo");
    fs.mkdirs(path);
    fs.close();
    fs = getHttpFSFileSystem();
    Path oldPath = new Path(path.toUri().getPath());
    Path newPath = new Path(path.getParent(), "bar");
    fs.rename(oldPath, newPath);
    fs.close();
    fs = FileSystem.get(getProxiedFSConf());
    assertFalse(fs.exists(oldPath));
    assertTrue(fs.exists(newPath));
    fs.close();
  }

  private void testDelete() throws Exception {
    Path foo = new Path(getProxiedFSTestDir(), "foo");
    Path bar = new Path(getProxiedFSTestDir(), "bar");
    Path foe = new Path(getProxiedFSTestDir(), "foe");
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    fs.mkdirs(foo);
    fs.mkdirs(new Path(bar, "a"));
    fs.mkdirs(foe);

    FileSystem hoopFs = getHttpFSFileSystem();
    assertTrue(hoopFs.delete(new Path(foo.toUri().getPath()), false));
    assertFalse(fs.exists(foo));
    try {
      hoopFs.delete(new Path(bar.toUri().getPath()), false);
      Assert.fail();
    } catch (IOException ex) {
    } catch (Exception ex) {
      Assert.fail();
    }
    assertTrue(fs.exists(bar));
    assertTrue(hoopFs.delete(new Path(bar.toUri().getPath()), true));
    assertFalse(fs.exists(bar));

    assertTrue(fs.exists(foe));
    assertTrue(hoopFs.delete(foe, true));
    assertFalse(fs.exists(foe));

    hoopFs.close();
    fs.close();
  }

  private void testListSymLinkStatus() throws Exception {
    if (isLocalFS()) {
      // do not test the the symlink for local FS.
      return;
    }
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    boolean isWebhdfs = fs instanceof WebHdfsFileSystem;
    Path path =
        new Path(getProxiedFSTestDir() + "-symlink", "targetFoo.txt");
    OutputStream os = fs.create(path);
    os.write(1);
    os.close();
    Path linkPath =
        new Path(getProxiedFSTestDir()+ "-symlink", "symlinkFoo.txt");
    fs.createSymlink(path, linkPath, false);
    fs = getHttpFSFileSystem();
    FileStatus linkStatus = fs.getFileStatus(linkPath);
    FileStatus status1 = fs.getFileStatus(path);

    FileStatus[] stati = fs.listStatus(path.getParent());
    assertEquals(2, stati.length);

    int countSymlink = 0;
    for (int i = 0; i < stati.length; i++) {
      FileStatus fStatus = stati[i];
      countSymlink += fStatus.isSymlink() ? 1 : 0;
    }
    assertEquals(1, countSymlink);

    assertFalse(status1.isSymlink());
    if (isWebhdfs) {
      assertTrue(linkStatus.isSymlink());
    }
    fs.close();
  }

  private void testListStatus() throws Exception {
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    boolean isDFS = fs instanceof DistributedFileSystem;
    Path path = new Path(getProxiedFSTestDir(), "foo.txt");
    OutputStream os = fs.create(path);
    os.write(1);
    os.close();
    FileStatus status1 = fs.getFileStatus(path);
    fs.close();

    fs = getHttpFSFileSystem();
    FileStatus status2 = fs.getFileStatus(new Path(path.toUri().getPath()));
    fs.close();

    assertEquals(status2.getPermission(), status1.getPermission());
    assertEquals(status2.getPath().toUri().getPath(),
        status1.getPath().toUri().getPath());
    assertEquals(status2.getReplication(), status1.getReplication());
    assertEquals(status2.getBlockSize(), status1.getBlockSize());
    assertEquals(status2.getAccessTime(), status1.getAccessTime());
    assertEquals(status2.getModificationTime(), status1.getModificationTime());
    assertEquals(status2.getOwner(), status1.getOwner());
    assertEquals(status2.getGroup(), status1.getGroup());
    assertEquals(status2.getLen(), status1.getLen());
    if (isDFS && status2 instanceof HdfsFileStatus) {
      assertTrue(status1 instanceof HdfsFileStatus);
      HdfsFileStatus hdfsFileStatus1 = (HdfsFileStatus) status1;
      HdfsFileStatus hdfsFileStatus2 = (HdfsFileStatus) status2;
      // Check HDFS-specific fields
      assertEquals(hdfsFileStatus2.getChildrenNum(),
          hdfsFileStatus1.getChildrenNum());
      assertEquals(hdfsFileStatus2.getFileId(),
          hdfsFileStatus1.getFileId());
      assertEquals(hdfsFileStatus2.getStoragePolicy(),
          hdfsFileStatus1.getStoragePolicy());
    }

    FileStatus[] stati = fs.listStatus(path.getParent());
    assertEquals(1, stati.length);
    assertEquals(stati[0].getPath().getName(), path.getName());

    // The full path should be the path to the file. See HDFS-12139
    FileStatus[] statl = fs.listStatus(path);
    Assert.assertEquals(1, statl.length);
    Assert.assertEquals(status2.getPath(), statl[0].getPath());
    Assert.assertEquals(statl[0].getPath().getName(), path.getName());
    Assert.assertEquals(stati[0].getPath(), statl[0].getPath());
  }

  private void testFileStatusAttr() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory
      Path path = new Path("/tmp/tmp-snap-test");
      DistributedFileSystem distributedFs = (DistributedFileSystem) FileSystem
          .get(path.toUri(), this.getProxiedFSConf());
      distributedFs.mkdirs(path);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      assertFalse("Snapshot should be disallowed by default",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Allow snapshot
      distributedFs.allowSnapshot(path);
      // Check FileStatus
      assertTrue("Snapshot enabled bit is not set in FileStatus",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Disallow snapshot
      distributedFs.disallowSnapshot(path);
      // Check FileStatus
      assertFalse("Snapshot enabled bit is not cleared in FileStatus",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Cleanup
      fs.delete(path, true);
      fs.close();
      distributedFs.close();
    }
  }

  private static void assertSameListing(FileSystem expected, FileSystem
      actual, Path p) throws IOException {
    // Consume all the entries from both iterators
    RemoteIterator<FileStatus> exIt = expected.listStatusIterator(p);
    List<FileStatus> exStatuses = new ArrayList<>();
    while (exIt.hasNext()) {
      exStatuses.add(exIt.next());
    }
    RemoteIterator<FileStatus> acIt = actual.listStatusIterator(p);
    List<FileStatus> acStatuses = new ArrayList<>();
    while (acIt.hasNext()) {
      acStatuses.add(acIt.next());
    }
    assertEquals(exStatuses.size(), acStatuses.size());
    for (int i = 0; i < exStatuses.size(); i++) {
      FileStatus expectedStatus = exStatuses.get(i);
      FileStatus actualStatus = acStatuses.get(i);
      // Path URIs are fully qualified, so compare just the path component
      assertEquals(expectedStatus.getPath().toUri().getPath(),
          actualStatus.getPath().toUri().getPath());
    }
  }

  private void testListStatusBatch() throws Exception {
    // LocalFileSystem writes checksum files next to the data files, which
    // show up when listing via LFS. This makes the listings not compare
    // properly.
    Assume.assumeFalse(isLocalFS());

    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
    Configuration conf = new Configuration();
    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
    FileSystem httpFs = getHttpFSFileSystem(conf);

    // Test an empty directory
    Path dir = new Path(getProxiedFSTestDir(), "dir");
    proxyFs.mkdirs(dir);
    assertSameListing(proxyFs, httpFs, dir);
    // Create and test in a loop
    for (int i = 0; i < 10; i++) {
      proxyFs.create(new Path(dir, "file" + i)).close();
      assertSameListing(proxyFs, httpFs, dir);
    }

    // Test for HDFS-12139
    Path dir1 = new Path(getProxiedFSTestDir(), "dir1");
    proxyFs.mkdirs(dir1);
    Path file1 = new Path(dir1, "file1");
    proxyFs.create(file1).close();

    RemoteIterator<FileStatus> si = proxyFs.listStatusIterator(dir1);
    FileStatus statusl = si.next();
    FileStatus status = proxyFs.getFileStatus(file1);
    Assert.assertEquals(file1.getName(), statusl.getPath().getName());
    Assert.assertEquals(status.getPath(), statusl.getPath());

    si = proxyFs.listStatusIterator(file1);
    statusl = si.next();
    Assert.assertEquals(file1.getName(), statusl.getPath().getName());
    Assert.assertEquals(status.getPath(), statusl.getPath());
  }

  private void testWorkingdirectory() throws Exception {
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    Path workingDir = fs.getWorkingDirectory();
    fs.close();

    fs = getHttpFSFileSystem();
    if (isLocalFS()) {
      fs.setWorkingDirectory(workingDir);
    }
    Path httpFSWorkingDir = fs.getWorkingDirectory();
    fs.close();
    assertEquals(httpFSWorkingDir.toUri().getPath(),
                        workingDir.toUri().getPath());

    fs = getHttpFSFileSystem();
    fs.setWorkingDirectory(new Path("/tmp"));
    workingDir = fs.getWorkingDirectory();
    assertEquals(workingDir.toUri().getPath(),
        new Path("/tmp").toUri().getPath());
    final FileSystem httpFs = getHttpFSFileSystem();
    LambdaTestUtils.intercept(IllegalArgumentException.class,
        "Invalid DFS directory name /foo:bar",
        () -> httpFs.setWorkingDirectory(new Path("/foo:bar")));
    fs.setWorkingDirectory(new Path("/bar"));
    workingDir = fs.getWorkingDirectory();
    httpFs.close();
    fs.close();
    assertEquals(workingDir.toUri().getPath(),
        new Path("/bar").toUri().getPath());
  }

  private void testTrashRoot() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());

      final Path rootDir = new Path("/");
      final Path fooPath = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(fooPath);
      os.write(1);
      os.close();

      Path trashPath = fs.getTrashRoot(rootDir);
      Path fooTrashPath = fs.getTrashRoot(fooPath);
      fs.close();

      fs = getHttpFSFileSystem();
      Path httpFSTrashPath = fs.getTrashRoot(rootDir);
      Path httpFSFooTrashPath = fs.getTrashRoot(fooPath);
      fs.close();

      assertEquals(trashPath.toUri().getPath(),
          httpFSTrashPath.toUri().getPath());
      assertEquals(fooTrashPath.toUri().getPath(),
          httpFSFooTrashPath.toUri().getPath());
      // trash path is related to USER, not path
      assertEquals(trashPath.toUri().getPath(),
          fooTrashPath.toUri().getPath());
    }
  }

  private void testMkdirs() throws Exception {
    Path path = new Path(getProxiedFSTestDir(), "foo");
    FileSystem fs = getHttpFSFileSystem();
    fs.mkdirs(path);
    fs.close();
    fs = FileSystem.get(getProxiedFSConf());
    assertTrue(fs.exists(path));
    fs.close();
  }

  private void testSetTimes() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      FileStatus status1 = fs.getFileStatus(path);
      fs.close();
      long at = status1.getAccessTime();
      long mt = status1.getModificationTime();

      fs = getHttpFSFileSystem();
      fs.setTimes(path, mt - 10, at - 20);
      fs.close();

      fs = FileSystem.get(getProxiedFSConf());
      status1 = fs.getFileStatus(path);
      fs.close();
      long atNew = status1.getAccessTime();
      long mtNew = status1.getModificationTime();
      assertEquals(mtNew, mt - 10);
      assertEquals(atNew, at - 20);
    }
  }

  protected void testSetPermission() throws Exception {
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    Path path = new Path(getProxiedFSTestDir(), "foodir");
    fs.mkdirs(path);

    fs = getHttpFSFileSystem();
    FsPermission permission1 = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE);
    fs.setPermission(path, permission1);
    fs.close();

    fs = FileSystem.get(getProxiedFSConf());
    FileStatus status1 = fs.getFileStatus(path);
    fs.close();
    FsPermission permission2 = status1.getPermission();
    assertEquals(permission2, permission1);

    //sticky bit
    fs = getHttpFSFileSystem();
    permission1 = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE, true);
    fs.setPermission(path, permission1);
    fs.close();

    fs = FileSystem.get(getProxiedFSConf());
    status1 = fs.getFileStatus(path);
    fs.close();
    permission2 = status1.getPermission();
    assertTrue(permission2.getStickyBit());
    assertEquals(permission2, permission1);
  }

  private void testSetOwner() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      fs.close();

      fs = getHttpFSFileSystem();
      String user = HadoopUsersConfTestHelper.getHadoopUsers()[1];
      String group = HadoopUsersConfTestHelper.getHadoopUserGroups(user)[0];
      fs.setOwner(path, user, group);
      fs.close();

      fs = FileSystem.get(getProxiedFSConf());
      FileStatus status1 = fs.getFileStatus(path);
      fs.close();
      assertEquals(status1.getOwner(), user);
      assertEquals(status1.getGroup(), group);
    }
  }

  private void testSetReplication() throws Exception {
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    Path path = new Path(getProxiedFSTestDir(), "foo.txt");
    OutputStream os = fs.create(path);
    os.write(1);
    os.close();
    fs.setReplication(path, (short) 2);
    fs.close();

    fs = getHttpFSFileSystem();
    fs.setReplication(path, (short) 1);
    fs.close();

    fs = FileSystem.get(getProxiedFSConf());
    FileStatus status1 = fs.getFileStatus(path);
    fs.close();
    assertEquals(status1.getReplication(), (short) 1);
  }

  private void testChecksum() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      FileChecksum hdfsChecksum = fs.getFileChecksum(path);
      fs.close();
      fs = getHttpFSFileSystem();
      FileChecksum httpChecksum = fs.getFileChecksum(path);
      fs.close();
      assertEquals(httpChecksum.getAlgorithmName(),
          hdfsChecksum.getAlgorithmName());
      assertEquals(httpChecksum.getLength(), hdfsChecksum.getLength());
      assertArrayEquals(httpChecksum.getBytes(), hdfsChecksum.getBytes());
    }
  }

  private void testContentSummary() throws Exception {
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    Path path = new Path(getProxiedFSTestDir(), "foo.txt");
    OutputStream os = fs.create(path);
    os.write(1);
    os.close();
    ContentSummary hdfsContentSummary = fs.getContentSummary(path);
    fs.close();
    fs = getHttpFSFileSystem();
    ContentSummary httpContentSummary = fs.getContentSummary(path);
    fs.close();
    assertEquals(hdfsContentSummary.getDirectoryCount(),
        httpContentSummary.getDirectoryCount());
    assertEquals(hdfsContentSummary.getErasureCodingPolicy(),
        httpContentSummary.getErasureCodingPolicy());
    assertEquals(hdfsContentSummary.getFileCount(),
        httpContentSummary.getFileCount());
    assertEquals(hdfsContentSummary.getLength(),
        httpContentSummary.getLength());
    assertEquals(hdfsContentSummary.getQuota(), httpContentSummary.getQuota());
    assertEquals(hdfsContentSummary.getSpaceConsumed(),
        httpContentSummary.getSpaceConsumed());
    assertEquals(hdfsContentSummary.getSpaceQuota(),
        httpContentSummary.getSpaceQuota());
  }

  private void testQuotaUsage() throws Exception {
    if (isLocalFS()) {
      // LocalFS doesn't support setQuota so skip here
      return;
    }

    DistributedFileSystem dfs =
        (DistributedFileSystem) FileSystem.get(getProxiedFSConf());
    Path path = new Path(getProxiedFSTestDir(), "foo");
    dfs.mkdirs(path);
    dfs.setQuota(path, 20, 600 * 1024 * 1024);
    for (int i = 0; i < 10; i++) {
      dfs.createNewFile(new Path(path, "test_file_" + i));
    }
    FSDataOutputStream out = dfs.create(new Path(path, "test_file"));
    out.writeUTF("Hello World");
    out.close();

    dfs.setQuotaByStorageType(path, StorageType.SSD, 100000);
    dfs.setQuotaByStorageType(path, StorageType.DISK, 200000);

    QuotaUsage hdfsQuotaUsage = dfs.getQuotaUsage(path);
    dfs.close();
    FileSystem fs = getHttpFSFileSystem();
    QuotaUsage httpQuotaUsage = fs.getQuotaUsage(path);
    fs.close();
    assertEquals(hdfsQuotaUsage.getFileAndDirectoryCount(),
        httpQuotaUsage.getFileAndDirectoryCount());
    assertEquals(hdfsQuotaUsage.getQuota(), httpQuotaUsage.getQuota());
    assertEquals(hdfsQuotaUsage.getSpaceConsumed(),
        httpQuotaUsage.getSpaceConsumed());
    assertEquals(hdfsQuotaUsage.getSpaceQuota(),
        httpQuotaUsage.getSpaceQuota());
    assertEquals(hdfsQuotaUsage.getTypeQuota(StorageType.SSD),
        httpQuotaUsage.getTypeQuota(StorageType.SSD));
    assertEquals(hdfsQuotaUsage.getTypeQuota(StorageType.DISK),
        httpQuotaUsage.getTypeQuota(StorageType.DISK));
  }
  
  /** Set xattr */
  private void testSetXAttr() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      fs.close();
 
      final String name1 = "user.a1";
      final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
      final String name2 = "user.a2";
      final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
      final String name3 = "user.a3";
      final byte[] value3 = null;
      final String name4 = "trusted.a1";
      final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
      final String name5 = "a1";
      fs = getHttpFSFileSystem();
      fs.setXAttr(path, name1, value1);
      fs.setXAttr(path, name2, value2);
      fs.setXAttr(path, name3, value3);
      fs.setXAttr(path, name4, value4);
      try {
        fs.setXAttr(path, name5, value1);
        Assert.fail("Set xAttr with incorrect name format should fail.");
      } catch (IOException e) {
      } catch (IllegalArgumentException e) {
      }
      fs.close();

      fs = FileSystem.get(getProxiedFSConf());
      Map<String, byte[]> xAttrs = fs.getXAttrs(path);
      fs.close();
      assertEquals(4, xAttrs.size());
      assertArrayEquals(value1, xAttrs.get(name1));
      assertArrayEquals(value2, xAttrs.get(name2));
      assertArrayEquals(new byte[0], xAttrs.get(name3));
      assertArrayEquals(value4, xAttrs.get(name4));
    }
  }

  /** Get xattrs */
  private void testGetXAttrs() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      fs.close();

      final String name1 = "user.a1";
      final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
      final String name2 = "user.a2";
      final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
      final String name3 = "user.a3";
      final byte[] value3 = null;
      final String name4 = "trusted.a1";
      final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
      fs = FileSystem.get(getProxiedFSConf());
      fs.setXAttr(path, name1, value1);
      fs.setXAttr(path, name2, value2);
      fs.setXAttr(path, name3, value3);
      fs.setXAttr(path, name4, value4);
      fs.close();

      // Get xattrs with names parameter
      fs = getHttpFSFileSystem();
      List<String> names = Lists.newArrayList();
      names.add(name1);
      names.add(name2);
      names.add(name3);
      names.add(name4);
      Map<String, byte[]> xAttrs = fs.getXAttrs(path, names);
      fs.close();
      assertEquals(4, xAttrs.size());
      assertArrayEquals(value1, xAttrs.get(name1));
      assertArrayEquals(value2, xAttrs.get(name2));
      assertArrayEquals(new byte[0], xAttrs.get(name3));
      assertArrayEquals(value4, xAttrs.get(name4));

      // Get specific xattr
      fs = getHttpFSFileSystem();
      byte[] value = fs.getXAttr(path, name1);
      assertArrayEquals(value1, value);
      final String name5 = "a1";
      try {
        value = fs.getXAttr(path, name5);
        Assert.fail("Get xAttr with incorrect name format should fail.");
      } catch (IOException e) {
      } catch (IllegalArgumentException e) {
      }
      fs.close();

      // Get all xattrs
      fs = getHttpFSFileSystem();
      xAttrs = fs.getXAttrs(path);
      fs.close();
      assertEquals(4, xAttrs.size());
      assertArrayEquals(value1, xAttrs.get(name1));
      assertArrayEquals(value2, xAttrs.get(name2));
      assertArrayEquals(new byte[0], xAttrs.get(name3));
      assertArrayEquals(value4, xAttrs.get(name4));
    }
  }

  /** Remove xattr */
  private void testRemoveXAttr() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      fs.close();

      final String name1 = "user.a1";
      final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
      final String name2 = "user.a2";
      final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
      final String name3 = "user.a3";
      final byte[] value3 = null;
      final String name4 = "trusted.a1";
      final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
      final String name5 = "a1";
      fs = FileSystem.get(getProxiedFSConf());
      fs.setXAttr(path, name1, value1);
      fs.setXAttr(path, name2, value2);
      fs.setXAttr(path, name3, value3);
      fs.setXAttr(path, name4, value4);
      fs.close();

      fs = getHttpFSFileSystem();
      fs.removeXAttr(path, name1);
      fs.removeXAttr(path, name3);
      fs.removeXAttr(path, name4);
      try {
        fs.removeXAttr(path, name5);
        Assert.fail("Remove xAttr with incorrect name format should fail.");
      } catch (IOException e) {
      } catch (IllegalArgumentException e) {
      }

      fs = FileSystem.get(getProxiedFSConf());
      Map<String, byte[]> xAttrs = fs.getXAttrs(path);
      fs.close();
      assertEquals(1, xAttrs.size());
      assertArrayEquals(value2, xAttrs.get(name2));
    }
  }

  /** List xattrs */
  private void testListXAttrs() throws Exception {
    if (!isLocalFS()) {
      FileSystem fs = FileSystem.get(getProxiedFSConf());
      fs.mkdirs(getProxiedFSTestDir());
      Path path = new Path(getProxiedFSTestDir(), "foo.txt");
      OutputStream os = fs.create(path);
      os.write(1);
      os.close();
      fs.close();

      final String name1 = "user.a1";
      final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
      final String name2 = "user.a2";
      final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
      final String name3 = "user.a3";
      final byte[] value3 = null;
      final String name4 = "trusted.a1";
      final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
      fs = FileSystem.get(getProxiedFSConf());
      fs.setXAttr(path, name1, value1);
      fs.setXAttr(path, name2, value2);
      fs.setXAttr(path, name3, value3);
      fs.setXAttr(path, name4, value4);
      fs.close();

      fs = getHttpFSFileSystem();
      List<String> names = fs.listXAttrs(path);
      assertEquals(4, names.size());
      assertTrue(names.contains(name1));
      assertTrue(names.contains(name2));
      assertTrue(names.contains(name3));
      assertTrue(names.contains(name4));
    }
  }

  /**
   * Runs assertions testing that two AclStatus objects contain the same info
   * @param a First AclStatus
   * @param b Second AclStatus
   * @throws Exception
   */
  private void assertSameAcls(AclStatus a, AclStatus b) throws Exception {
    assertEquals(a.getOwner(), b.getOwner());
    assertEquals(a.getGroup(), b.getGroup());
    assertEquals(a.getPermission(), b.getPermission());
    assertEquals(a.isStickyBit(), b.isStickyBit());
    assertEquals(a.getEntries().size(), b.getEntries().size());
    for (AclEntry e : a.getEntries()) {
      assertTrue(b.getEntries().contains(e));
    }
    for (AclEntry e : b.getEntries()) {
      assertTrue(a.getEntries().contains(e));
    }
  }

  private static void assertSameAcls(FileSystem expected, FileSystem actual,
      Path path) throws IOException {
    FileStatus expectedFileStatus = expected.getFileStatus(path);
    FileStatus actualFileStatus = actual.getFileStatus(path);
    assertEquals(actualFileStatus.hasAcl(), expectedFileStatus.hasAcl());
    // backwards compat
    assertEquals(actualFileStatus.getPermission().getAclBit(),
        expectedFileStatus.getPermission().getAclBit());
  }

  /**
   * Simple ACL tests on a file:  Set an acl, add an acl, remove one acl,
   * and remove all acls.
   * @throws Exception
   */
  private void testFileAcls() throws Exception {
    if ( isLocalFS() ) {
      return;
    }

    final String aclUser1 = "user:foo:rw-";
    final String rmAclUser1 = "user:foo:";
    final String aclUser2 = "user:bar:r--";
    final String aclGroup1 = "group::r--";
    final String aclSet = "user::rwx," + aclUser1 + ","
            + aclGroup1 + ",other::---";

    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
    FileSystem httpfs = getHttpFSFileSystem();

    Path path = new Path(getProxiedFSTestDir(), "testAclStatus.txt");
    OutputStream os = proxyFs.create(path);
    os.write(1);
    os.close();

    AclStatus proxyAclStat = proxyFs.getAclStatus(path);
    AclStatus httpfsAclStat = httpfs.getAclStatus(path);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, path);

    httpfs.setAcl(path, AclEntry.parseAclSpec(aclSet,true));
    proxyAclStat = proxyFs.getAclStatus(path);
    httpfsAclStat = httpfs.getAclStatus(path);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, path);

    httpfs.modifyAclEntries(path, AclEntry.parseAclSpec(aclUser2, true));
    proxyAclStat = proxyFs.getAclStatus(path);
    httpfsAclStat = httpfs.getAclStatus(path);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, path);

    httpfs.removeAclEntries(path, AclEntry.parseAclSpec(rmAclUser1, false));
    proxyAclStat = proxyFs.getAclStatus(path);
    httpfsAclStat = httpfs.getAclStatus(path);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, path);

    httpfs.removeAcl(path);
    proxyAclStat = proxyFs.getAclStatus(path);
    httpfsAclStat = httpfs.getAclStatus(path);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, path);
  }

  /**
   * Simple acl tests on a directory: set a default acl, remove default acls.
   * @throws Exception
   */
  private void testDirAcls() throws Exception {
    if ( isLocalFS() ) {
      return;
    }

    final String defUser1 = "default:user:glarch:r-x";

    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
    FileSystem httpfs = getHttpFSFileSystem();

    Path dir = getProxiedFSTestDir();

    /* ACL Status on a directory */
    AclStatus proxyAclStat = proxyFs.getAclStatus(dir);
    AclStatus httpfsAclStat = httpfs.getAclStatus(dir);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, dir);

    /* Set a default ACL on the directory */
    httpfs.setAcl(dir, (AclEntry.parseAclSpec(defUser1,true)));
    proxyAclStat = proxyFs.getAclStatus(dir);
    httpfsAclStat = httpfs.getAclStatus(dir);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, dir);

    /* Remove the default ACL */
    httpfs.removeDefaultAcl(dir);
    proxyAclStat = proxyFs.getAclStatus(dir);
    httpfsAclStat = httpfs.getAclStatus(dir);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, dir);
  }

  private void testEncryption() throws Exception {
    if (isLocalFS()) {
      return;
    }
    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
    FileSystem httpFs = getHttpFSFileSystem();
    FileStatus proxyStatus = proxyFs.getFileStatus(TestHdfsHelper
        .ENCRYPTED_FILE);
    assertTrue(proxyStatus.isEncrypted());
    FileStatus httpStatus = httpFs.getFileStatus(TestHdfsHelper
        .ENCRYPTED_FILE);
    assertTrue(httpStatus.isEncrypted());
    proxyStatus = proxyFs.getFileStatus(new Path("/"));
    httpStatus = httpFs.getFileStatus(new Path("/"));
    assertFalse(proxyStatus.isEncrypted());
    assertFalse(httpStatus.isEncrypted());
  }

  private void testErasureCoding() throws Exception {
    Assume.assumeFalse("Assume its not a local FS!", isLocalFS());
    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
    FileSystem httpFS = getHttpFSFileSystem();
    Path filePath = new Path(getProxiedFSTestDir(), "foo.txt");
    proxyFs.create(filePath).close();

    ContractTestUtils.assertNotErasureCoded(httpFS, getProxiedFSTestDir());
    ContractTestUtils.assertNotErasureCoded(httpFS, filePath);
    ContractTestUtils.assertErasureCoded(httpFS,
        TestHdfsHelper.ERASURE_CODING_DIR);
    ContractTestUtils.assertErasureCoded(httpFS,
        TestHdfsHelper.ERASURE_CODING_FILE);

    proxyFs.close();
    httpFS.close();
  }

  private void testStoragePolicy() throws Exception {
    Assume.assumeFalse("Assume its not a local FS", isLocalFS());
    FileSystem fs = FileSystem.get(getProxiedFSConf());
    fs.mkdirs(getProxiedFSTestDir());
    Path path = new Path(getProxiedFSTestDir(), "policy.txt");
    FileSystem httpfs = getHttpFSFileSystem();
    // test getAllStoragePolicies
    Assert.assertArrayEquals(
        "Policy array returned from the DFS and HttpFS should be equals",
        fs.getAllStoragePolicies().toArray(), httpfs.getAllStoragePolicies().toArray());

    // test get/set/unset policies
    DFSTestUtil.createFile(fs, path, 0, (short) 1, 0L);
    // get defaultPolicy
   BlockStoragePolicySpi defaultdfsPolicy = fs.getStoragePolicy(path);
    // set policy through webhdfs
    httpfs.setStoragePolicy(path, HdfsConstants.COLD_STORAGE_POLICY_NAME);
    // get policy from dfs
    BlockStoragePolicySpi dfsPolicy = fs.getStoragePolicy(path);
    // get policy from webhdfs
    BlockStoragePolicySpi httpFsPolicy = httpfs.getStoragePolicy(path);
    Assert
       .assertEquals(
            "Storage policy returned from the get API should"
            + " be same as set policy",
            HdfsConstants.COLD_STORAGE_POLICY_NAME.toString(),
            httpFsPolicy.getName());
    Assert.assertEquals(
        "Storage policy returned from the DFS and HttpFS should be equals",
        httpFsPolicy, dfsPolicy);
    // unset policy
    httpfs.unsetStoragePolicy(path);
    Assert
       .assertEquals(
            "After unset storage policy, the get API shoudld"
            + " return the default policy",
            defaultdfsPolicy, httpfs.getStoragePolicy(path));
    fs.close();
  }

  protected enum Operation {
    GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
    WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
    SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, QUOTA_USAGE, FILEACLS, DIRACLS,
    SET_XATTR, GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION,
    LIST_STATUS_BATCH, GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING,
    CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT,
    ALLOW_SNAPSHOT, DISALLOW_SNAPSHOT, DISALLOW_SNAPSHOT_EXCEPTION,
    FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
    GET_SNAPSHOT_LIST, GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY,
    SATISFYSTORAGEPOLICY, GET_SNAPSHOT_DIFF_LISTING, GETFILEBLOCKLOCATIONS,
    GETFILELINKSTATUS, GETSTATUS, GETECPOLICIES, GETECCODECS, GETTRASHROOTS
  }
  @SuppressWarnings("methodlength")
  private void operation(Operation op) throws Exception {
    switch (op) {
    case GET:
      testGet();
      break;
    case OPEN:
      testOpen();
      break;
    case CREATE:
      testCreate();
      break;
    case APPEND:
      testAppend();
      break;
    case TRUNCATE:
      testTruncate();
      break;
    case CONCAT:
      testConcat();
      break;
    case RENAME:
      testRename();
      break;
    case DELETE:
      testDelete();
      break;
    case LIST_STATUS:
      testListStatus();
      testListSymLinkStatus();
      break;
    case WORKING_DIRECTORY:
      testWorkingdirectory();
      break;
    case MKDIRS:
      testMkdirs();
      break;
    case SET_TIMES:
      testSetTimes();
      break;
    case SET_PERMISSION:
      testSetPermission();
      break;
    case SET_OWNER:
      testSetOwner();
      break;
    case SET_REPLICATION:
      testSetReplication();
      break;
    case CHECKSUM:
      testChecksum();
      break;
    case CONTENT_SUMMARY:
      testContentSummary();
      break;
    case QUOTA_USAGE:
      testQuotaUsage();
      break;
    case FILEACLS:
      testFileAclsCustomizedUserAndGroupNames();
      testFileAcls();
      break;
    case DIRACLS:
      testDirAcls();
      break;
    case SET_XATTR:
      testSetXAttr();
      break;
    case REMOVE_XATTR:
      testRemoveXAttr();
      break;
    case GET_XATTRS:
      testGetXAttrs();
      break;
    case LIST_XATTRS:
      testListXAttrs();
      break;
    case ENCRYPTION:
      testEncryption();
      break;
    case LIST_STATUS_BATCH:
      testListStatusBatch();
      break;
    case GETTRASHROOT:
      testTrashRoot();
      break;
    case STORAGEPOLICY:
      testStoragePolicy();
      break;
    case ERASURE_CODING:
      testErasureCoding();
      break;
    case CREATE_SNAPSHOT:
      testCreateSnapshot();
      break;
    case RENAME_SNAPSHOT:
      testRenameSnapshot();
      break;
    case DELETE_SNAPSHOT:
      testDeleteSnapshot();
      break;
    case ALLOW_SNAPSHOT:
      testAllowSnapshot();
      break;
    case DISALLOW_SNAPSHOT:
      testDisallowSnapshot();
      break;
    case DISALLOW_SNAPSHOT_EXCEPTION:
      testDisallowSnapshotException();
      break;
    case FILE_STATUS_ATTR:
      testFileStatusAttr();
      break;
    case GET_SNAPSHOT_DIFF:
      testGetSnapshotDiff();
      testGetSnapshotDiffIllegalParam();
      break;
    case GET_SNAPSHOTTABLE_DIRECTORY_LIST:
      testGetSnapshottableDirListing();
      break;
    case GET_SNAPSHOT_LIST:
      testGetSnapshotListing();
      break;
    case GET_SERVERDEFAULTS:
      testGetServerDefaults();
      break;
    case CHECKACCESS:
      testAccess();
      break;
    case SETECPOLICY:
      testErasureCodingPolicy();
      break;
    case SATISFYSTORAGEPOLICY:
      testStoragePolicySatisfier();
      break;
    case GET_SNAPSHOT_DIFF_LISTING:
      testGetSnapshotDiffListing();
      break;
    case GETFILEBLOCKLOCATIONS:
      testGetFileBlockLocations();
      break;
    case GETFILELINKSTATUS:
      testGetFileLinkStatus();
      break;
    case GETSTATUS:
      testGetStatus();
      break;
    case GETECPOLICIES:
      testGetAllEEPolicies();
      break;
    case GETECCODECS:
      testGetECCodecs();
      break;
    case GETTRASHROOTS:
      testGetTrashRoots();
      break;
    }
  }

  @Parameterized.Parameters
  public static Collection operations() {
    Object[][] ops = new Object[Operation.values().length][];
    for (int i = 0; i < Operation.values().length; i++) {
      ops[i] = new Object[]{Operation.values()[i]};
    }
    //To test one or a subset of operations do:
    //return Arrays.asList(new Object[][]{ new Object[]{Operation.APPEND}});
    return Arrays.asList(ops);
  }

  private Operation operation;

  public BaseTestHttpFSWith(Operation operation) {
    this.operation = operation;
  }

  @Test
  @TestDir
  @TestJetty
  @TestHdfs
  public void testOperation() throws Exception {
    createHttpFSServer();
    operation(operation);
  }

  @Test
  @TestDir
  @TestJetty
  @TestHdfs
  public void testOperationDoAs() throws Exception {
    createHttpFSServer();
    UserGroupInformation ugi = UserGroupInformation.createProxyUser(HadoopUsersConfTestHelper.getHadoopUsers()[0],
                                                                    UserGroupInformation.getCurrentUser());
    ugi.doAs(new PrivilegedExceptionAction<Void>() {
      @Override
      public Void run() throws Exception {
        operation(operation);
        return null;
      }
    });
  }

  private void testCreateSnapshot(String snapshotName) throws Exception {
    if (!this.isLocalFS()) {
      Path snapshottablePath = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(snapshottablePath);
      //Now get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      if (snapshotName == null) {
        fs.createSnapshot(snapshottablePath);
      } else {
        fs.createSnapshot(snapshottablePath, snapshotName);
      }
      Path snapshotsDir = new Path("/tmp/tmp-snap-test/.snapshot");
      FileStatus[] snapshotItems = fs.listStatus(snapshotsDir);
      assertTrue("Should have exactly one snapshot.",
          snapshotItems.length == 1);
      String resultingSnapName = snapshotItems[0].getPath().getName();
      if (snapshotName == null) {
        assertTrue("Snapshot auto generated name not matching pattern",
            Pattern.matches("(s)(\\d{8})(-)(\\d{6})(\\.)(\\d{3})",
                resultingSnapName));
      } else {
        assertTrue("Snapshot name is not same as passed name.",
            snapshotName.equals(resultingSnapName));
      }
      cleanSnapshotTests(snapshottablePath, resultingSnapName);
    }
  }

  private void testCreateSnapshot() throws Exception {
    testCreateSnapshot(null);
    testCreateSnapshot("snap-with-name");
  }

  private void createSnapshotTestsPreconditions(Path snapshottablePath,
      Boolean allowSnapshot) throws Exception {
    //Needed to get a DistributedFileSystem instance, in order to
    //call allowSnapshot on the newly created directory
    DistributedFileSystem distributedFs = (DistributedFileSystem)
        FileSystem.get(snapshottablePath.toUri(), this.getProxiedFSConf());
    distributedFs.mkdirs(snapshottablePath);
    if (allowSnapshot) {
      distributedFs.allowSnapshot(snapshottablePath);
    }
    Path subdirPath = new Path("/tmp/tmp-snap-test/subdir");
    distributedFs.mkdirs(subdirPath);
  }

  private void createSnapshotTestsPreconditions(Path snapshottablePath)
      throws Exception {
    // Allow snapshot by default for snapshot test
    createSnapshotTestsPreconditions(snapshottablePath, true);
  }

  private void cleanSnapshotTests(Path snapshottablePath,
                                  String resultingSnapName) throws Exception {
    DistributedFileSystem distributedFs = (DistributedFileSystem)
        FileSystem.get(snapshottablePath.toUri(), this.getProxiedFSConf());
    distributedFs.deleteSnapshot(snapshottablePath, resultingSnapName);
    distributedFs.delete(snapshottablePath, true);
  }

  private void testRenameSnapshot() throws Exception {
    if (!this.isLocalFS()) {
      Path snapshottablePath = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(snapshottablePath);
      //Now get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      fs.createSnapshot(snapshottablePath, "snap-to-rename");
      fs.renameSnapshot(snapshottablePath, "snap-to-rename",
          "snap-new-name");
      Path snapshotsDir = new Path("/tmp/tmp-snap-test/.snapshot");
      FileStatus[] snapshotItems = fs.listStatus(snapshotsDir);
      assertTrue("Should have exactly one snapshot.",
          snapshotItems.length == 1);
      String resultingSnapName = snapshotItems[0].getPath().getName();
      assertTrue("Snapshot name is not same as passed name.",
          "snap-new-name".equals(resultingSnapName));
      cleanSnapshotTests(snapshottablePath, resultingSnapName);
    }
  }

  private void testDeleteSnapshot() throws Exception {
    if (!this.isLocalFS()) {
      Path snapshottablePath = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(snapshottablePath);
      //Now get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      fs.createSnapshot(snapshottablePath, "snap-to-delete");
      Path snapshotsDir = new Path("/tmp/tmp-snap-test/.snapshot");
      FileStatus[] snapshotItems = fs.listStatus(snapshotsDir);
      assertTrue("Should have exactly one snapshot.",
          snapshotItems.length == 1);
      fs.deleteSnapshot(snapshottablePath, "snap-to-delete");
      snapshotItems = fs.listStatus(snapshotsDir);
      assertTrue("There should be no snapshot anymore.",
          snapshotItems.length == 0);
      fs.delete(snapshottablePath, true);
    }
  }

  private void testAllowSnapshot() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory with snapshot disallowed
      Path path = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(path, false);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      assertFalse("Snapshot should be disallowed by default",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Allow snapshot
      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        httpFS.allowSnapshot(path);
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        webHdfsFileSystem.allowSnapshot(path);
      } else {
        Assert.fail(fs.getClass().getSimpleName() +
            " doesn't support allowSnapshot");
      }
      // Check FileStatus
      assertTrue("allowSnapshot failed",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Cleanup
      fs.delete(path, true);
    }
  }

  private void testDisallowSnapshot() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory with snapshot allowed
      Path path = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(path);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      assertTrue("Snapshot should be allowed by DFS",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Disallow snapshot
      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        httpFS.disallowSnapshot(path);
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        webHdfsFileSystem.disallowSnapshot(path);
      } else {
        Assert.fail(fs.getClass().getSimpleName() +
            " doesn't support disallowSnapshot");
      }
      // Check FileStatus
      assertFalse("disallowSnapshot failed",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Cleanup
      fs.delete(path, true);
    }
  }

  private void testDisallowSnapshotException() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory with snapshot allowed
      Path path = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(path);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      assertTrue("Snapshot should be allowed by DFS",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Create some snapshots
      fs.createSnapshot(path, "snap-01");
      fs.createSnapshot(path, "snap-02");
      // Disallow snapshot
      boolean disallowSuccess = false;
      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        try {
          httpFS.disallowSnapshot(path);
          disallowSuccess = true;
        } catch (SnapshotException e) {
          // Expect SnapshotException
        }
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        try {
          webHdfsFileSystem.disallowSnapshot(path);
          disallowSuccess = true;
        } catch (SnapshotException e) {
          // Expect SnapshotException
        }
      } else {
        Assert.fail(fs.getClass().getSimpleName() +
            " doesn't support disallowSnapshot");
      }
      if (disallowSuccess) {
        Assert.fail("disallowSnapshot doesn't throw SnapshotException when "
            + "disallowing snapshot on a directory with at least one snapshot");
      }
      // Check FileStatus, should still be enabled since
      // disallow snapshot should fail
      assertTrue("disallowSnapshot should not have succeeded",
          fs.getFileStatus(path).isSnapshotEnabled());
      // Cleanup
      fs.deleteSnapshot(path, "snap-02");
      fs.deleteSnapshot(path, "snap-01");
      fs.delete(path, true);
    }
  }

  private void testGetSnapshotDiff() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory with snapshot allowed
      Path path = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(path);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      Assert.assertTrue(fs.getFileStatus(path).isSnapshotEnabled());
      // Create a file and take a snapshot
      Path file1 = new Path(path, "file1");
      testCreate(file1, false);
      fs.createSnapshot(path, "snap1");
      // Create another file and take a snapshot
      Path file2 = new Path(path, "file2");
      testCreate(file2, false);
      fs.createSnapshot(path, "snap2");

      try {
        // Get snapshot diff
        SnapshotDiffReport diffReport = null;
        if (fs instanceof HttpFSFileSystem) {
          HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
          diffReport = httpFS.getSnapshotDiffReport(path, "snap1", "snap2");
        } else if (fs instanceof WebHdfsFileSystem) {
          WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
          diffReport = webHdfsFileSystem.getSnapshotDiffReport(path, "snap1", "snap2");
        } else {
          Assert.fail(fs.getClass().getSimpleName() + " doesn't support getSnapshotDiff");
        }
        // Verify result with DFS
        DistributedFileSystem dfs =
            (DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
        SnapshotDiffReport dfsDiffReport = dfs.getSnapshotDiffReport(path, "snap1", "snap2");
        Assert.assertEquals(diffReport.toString(), dfsDiffReport.toString());
      } finally {
        // Cleanup
        fs.deleteSnapshot(path, "snap2");
        fs.deleteSnapshot(path, "snap1");
        fs.delete(path, true);
      }
    }
  }

  private void testGetSnapshotDiffIllegalParamCase(FileSystem fs, Path path,
      String oldsnapshotname, String snapshotname) throws IOException {
    try {
      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        httpFS.getSnapshotDiffReport(path, oldsnapshotname, snapshotname);
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        webHdfsFileSystem.getSnapshotDiffReport(path, oldsnapshotname,
            snapshotname);
      } else {
        Assert.fail(fs.getClass().getSimpleName() +
            " doesn't support getSnapshotDiff");
      }
    } catch (SnapshotException|IllegalArgumentException|RemoteException e) {
      // Expect SnapshotException, IllegalArgumentException
      // or RemoteException(IllegalArgumentException)
      if (e instanceof RemoteException) {
        // Check RemoteException class name, should be IllegalArgumentException
        Assert.assertEquals(((RemoteException) e).getClassName()
            .compareTo(java.lang.IllegalArgumentException.class.getName()), 0);
      }
      return;
    }
    Assert.fail("getSnapshotDiff illegal param didn't throw Exception");
  }

  private void testGetSnapshotDiffIllegalParam() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory with snapshot allowed
      Path path = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(path);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      assertTrue("Snapshot should be allowed by DFS",
          fs.getFileStatus(path).isSnapshotEnabled());
      Assert.assertTrue(fs.getFileStatus(path).isSnapshotEnabled());
      // Get snapshot diff
      testGetSnapshotDiffIllegalParamCase(fs, path, "", "");
      testGetSnapshotDiffIllegalParamCase(fs, path, "snap1", "");
      testGetSnapshotDiffIllegalParamCase(fs, path, "", "snap2");
      testGetSnapshotDiffIllegalParamCase(fs, path, "snap1", "snap2");
      // Cleanup
      fs.delete(path, true);
    }
  }

  private void verifyGetSnapshottableDirListing(
      FileSystem fs, DistributedFileSystem dfs) throws Exception {
    // Get snapshottable directory list
    SnapshottableDirectoryStatus[] sds = null;
    if (fs instanceof HttpFSFileSystem) {
      HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
      sds = httpFS.getSnapshottableDirectoryList();
    } else if (fs instanceof WebHdfsFileSystem) {
      WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
      sds = webHdfsFileSystem.getSnapshottableDirectoryList();
    } else {
      Assert.fail(fs.getClass().getSimpleName() +
          " doesn't support getSnapshottableDirListing");
    }
    // Verify result with DFS
    SnapshottableDirectoryStatus[] dfssds = dfs.getSnapshottableDirListing();
    Assert.assertEquals(JsonUtil.toJsonString(sds),
        JsonUtil.toJsonString(dfssds));
  }

  private void testGetSnapshotListing() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory with snapshot allowed
      Path path = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(path);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      Assert.assertTrue(fs.getFileStatus(path).isSnapshotEnabled());
      // Create a file and take a snapshot
      Path file1 = new Path(path, "file1");
      testCreate(file1, false);
      fs.createSnapshot(path, "snap1");
      // Create another file and take a snapshot
      Path file2 = new Path(path, "file2");
      testCreate(file2, false);
      fs.createSnapshot(path, "snap2");
      // Get snapshot diff
      SnapshotStatus[] snapshotStatus = null;
      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        snapshotStatus = httpFS.getSnapshotListing(path);
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        snapshotStatus = webHdfsFileSystem.getSnapshotListing(path);
      } else {
        Assert.fail(fs.getClass().getSimpleName() +
            " doesn't support getSnapshotDiff");
      }
      // Verify result with DFS
      DistributedFileSystem dfs = (DistributedFileSystem)
          FileSystem.get(path.toUri(), this.getProxiedFSConf());
      SnapshotStatus[] dfsStatus =
          dfs.getSnapshotListing(path);
      Assert.assertEquals(JsonUtil.toJsonString(snapshotStatus),
          JsonUtil.toJsonString(dfsStatus));
      // Cleanup
      fs.deleteSnapshot(path, "snap2");
      fs.deleteSnapshot(path, "snap1");
      fs.delete(path, true);
    }
  }


  private void testGetSnapshottableDirListing() throws Exception {
    if (!this.isLocalFS()) {
      FileSystem fs = this.getHttpFSFileSystem();
      // Create directories with snapshot allowed
      Path path1 = new Path("/tmp/tmp-snap-dirlist-test-1");
      DistributedFileSystem dfs = (DistributedFileSystem)
          FileSystem.get(path1.toUri(), this.getProxiedFSConf());
      // Verify response when there is no snapshottable directory
      verifyGetSnapshottableDirListing(fs, dfs);
      createSnapshotTestsPreconditions(path1);
      Assert.assertTrue(fs.getFileStatus(path1).isSnapshotEnabled());
      // Verify response when there is one snapshottable directory
      verifyGetSnapshottableDirListing(fs, dfs);
      Path path2 = new Path("/tmp/tmp-snap-dirlist-test-2");
      createSnapshotTestsPreconditions(path2);
      Assert.assertTrue(fs.getFileStatus(path2).isSnapshotEnabled());
      // Verify response when there are two snapshottable directories
      verifyGetSnapshottableDirListing(fs, dfs);

      // Clean up and verify
      fs.delete(path2, true);
      verifyGetSnapshottableDirListing(fs, dfs);
      fs.delete(path1, true);
      verifyGetSnapshottableDirListing(fs, dfs);
    }
  }

  private void testFileAclsCustomizedUserAndGroupNames() throws Exception {
    if (isLocalFS()) {
      return;
    }

    // Get appropriate conf from the cluster
    MiniDFSCluster miniDFSCluster = ((TestHdfsHelper) hdfsTestHelper)
        .getMiniDFSCluster();
    Configuration conf = miniDFSCluster.getConfiguration(0);
    // If we call getHttpFSFileSystem() without conf from the mini cluster,
    // WebHDFS will be initialized with the default ACL string, causing the
    // setAcl() later to fail. This is only an issue in the unit test.
    FileSystem httpfs = getHttpFSFileSystem(conf);
    if (!(httpfs instanceof WebHdfsFileSystem)
        && !(httpfs instanceof HttpFSFileSystem)) {
      Assert.fail(httpfs.getClass().getSimpleName() +
          " doesn't support custom user and group name pattern. "
          + "Only WebHdfsFileSystem and HttpFSFileSystem support it.");
    }
    final String aclUser = "user:123:rwx";
    final String aclGroup = "group:foo@bar:r--";
    final String aclSet = "user::rwx," + aclUser + ",group::r--," +
        aclGroup + ",other::r--";
    final String dir = "/aclFileTestCustom";
    // Create test file
    FileSystem proxyFs = FileSystem.get(conf);
    proxyFs.mkdirs(new Path(dir));
    Path path = new Path(dir, "/testACL");
    OutputStream os = proxyFs.create(path);
    os.write(1);
    os.close();
    // Set ACL
    httpfs.setAcl(path, AclEntry.parseAclSpec(aclSet, true));
    // Verify getAclStatus responses are the same
    AclStatus proxyAclStat = proxyFs.getAclStatus(path);
    AclStatus httpfsAclStat = httpfs.getAclStatus(path);
    assertSameAcls(httpfsAclStat, proxyAclStat);
    assertSameAcls(httpfs, proxyFs, path);
    // Verify that custom user and group are set.
    List<String> strEntries = new ArrayList<>();
    for (AclEntry aclEntry : httpfsAclStat.getEntries()) {
      strEntries.add(aclEntry.toStringStable());
    }
    Assert.assertTrue(strEntries.contains(aclUser));
    Assert.assertTrue(strEntries.contains(aclGroup));
    // Clean up
    proxyFs.delete(new Path(dir), true);
  }

  private void verifyGetServerDefaults(FileSystem fs, DistributedFileSystem dfs)
      throws Exception {
    FsServerDefaults sds = null;
    if (fs instanceof HttpFSFileSystem) {
      HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
      sds = httpFS.getServerDefaults();
    } else if (fs instanceof WebHdfsFileSystem) {
      WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
      sds = webHdfsFileSystem.getServerDefaults();
    } else {
      Assert.fail(
          fs.getClass().getSimpleName() + " doesn't support getServerDefaults");
    }
    // Verify result with DFS
    FsServerDefaults dfssds = dfs.getServerDefaults();
    Assert.assertEquals(JsonUtil.toJsonString(sds),
        JsonUtil.toJsonString(dfssds));
  }

  private void testGetServerDefaults() throws Exception {
    if (!this.isLocalFS()) {
      FileSystem fs = this.getHttpFSFileSystem();
      Path path1 = new Path("/");
      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
          .get(path1.toUri(), this.getProxiedFSConf());
      verifyGetServerDefaults(fs, dfs);
    }
  }

  private void testAccess() throws Exception {
    if (!this.isLocalFS()) {
      FileSystem fs = this.getHttpFSFileSystem();
      Path path1 = new Path("/");
      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
          .get(path1.toUri(), this.getProxiedFSConf());
      verifyAccess(fs, dfs);
    }
  }

  private void verifyAccess(FileSystem fs, DistributedFileSystem dfs)
      throws Exception {
    Path p1 = new Path("/p1");
    dfs.mkdirs(p1);
    dfs.setOwner(p1, "user1", "group1");
    dfs.setPermission(p1, new FsPermission((short) 0444));

    if (fs instanceof HttpFSFileSystem) {
      HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
      httpFS.access(p1, FsAction.READ);
    } else if (fs instanceof WebHdfsFileSystem) {
      WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
      webHdfsFileSystem.access(p1, FsAction.READ);
    } else {
      Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
    }
  }

  private void testErasureCodingPolicy() throws Exception {
    if (!this.isLocalFS()) {
      FileSystem fs = this.getHttpFSFileSystem();
      Path path1 = new Path("/");
      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
          .get(path1.toUri(), this.getProxiedFSConf());
      final String dir = "/xattrTest";
      Path p1 = new Path(dir);

      final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies
          .getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
      final String ecPolicyName = ecPolicy.getName();
      dfs.mkdirs(new Path(dir));
      dfs.enableErasureCodingPolicy(ecPolicyName);

      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        httpFS.setErasureCodingPolicy(p1, ecPolicyName);
        ErasureCodingPolicy ecPolicy1 = httpFS.getErasureCodingPolicy(p1);
        assertEquals(ecPolicy, ecPolicy1);
        httpFS.unsetErasureCodingPolicy(p1);
        ecPolicy1 = httpFS.getErasureCodingPolicy(p1);
        Assert.assertNull(ecPolicy1);
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        webHdfsFileSystem.setErasureCodingPolicy(p1, ecPolicyName);
        ErasureCodingPolicy ecPolicy1 =
            webHdfsFileSystem.getErasureCodingPolicy(p1);
        assertEquals(ecPolicy, ecPolicy1);
        webHdfsFileSystem.unsetErasureCodingPolicy(p1);
        ecPolicy1 = dfs.getErasureCodingPolicy(p1);
        Assert.assertNull(ecPolicy1);
      } else {
        Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
      }
    }
  }

  public void testStoragePolicySatisfier() throws Exception {
    final String dir = "/parent";
    Path path1 = new Path(dir);
    String file = "/parent/file";
    Path filePath = new Path(file);
    if (!this.isLocalFS()) {
      FileSystem fs = this.getHttpFSFileSystem();
      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
          .get(path1.toUri(), this.getProxiedFSConf());
      dfs.mkdirs(path1);
      dfs.create(filePath).close();
      dfs.setStoragePolicy(filePath, HdfsConstants.COLD_STORAGE_POLICY_NAME);
      BlockStoragePolicy storagePolicy =
          (BlockStoragePolicy) dfs.getStoragePolicy(filePath);
      assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME,
          storagePolicy.getName());
      Map<String, byte[]> xAttrs;
      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        httpFS.satisfyStoragePolicy(path1);
        xAttrs = httpFS.getXAttrs(path1);
        assertTrue(xAttrs
            .containsKey(HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY));
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        webHdfsFileSystem.satisfyStoragePolicy(path1);
        xAttrs = webHdfsFileSystem.getXAttrs(path1);
        assertTrue(xAttrs
            .containsKey(HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY));
      } else {
        Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
      }
      dfs.delete(path1, true);
    }
  }

  private void testGetFileBlockLocations() throws Exception {
    BlockLocation[] blockLocations;
    Path testFile;
    if (!this.isLocalFS()) {
      FileSystem fs = this.getHttpFSFileSystem();
      testFile = new Path(getProxiedFSTestDir(), "singleBlock.txt");
      DFSTestUtil.createFile(fs, testFile, 1, (short) 1, 0L);
      if (fs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
        blockLocations = httpFS.getFileBlockLocations(testFile, 0, 1);
        assertNotNull(blockLocations);

        // verify HttpFSFileSystem.toBlockLocations()
        String jsonString = JsonUtil.toJsonString(blockLocations);
        JSONParser parser = new JSONParser();
        JSONObject jsonObject = (JSONObject) parser.parse(jsonString, (ContainerFactory) null);
        BlockLocation[] deserializedLocation = HttpFSFileSystem.toBlockLocations(jsonObject);
        assertEquals(blockLocations.length, deserializedLocation.length);
        for (int i = 0; i < blockLocations.length; i++) {
          assertEquals(blockLocations[i].toString(), deserializedLocation[i].toString());
        }
      } else if (fs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
        blockLocations = webHdfsFileSystem.getFileBlockLocations(testFile, 0, 1);
        assertNotNull(blockLocations);
      } else {
        Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
      }
    }
  }

  private void testGetSnapshotDiffListing() throws Exception {
    if (!this.isLocalFS()) {
      // Create a directory with snapshot allowed
      Path path = new Path("/tmp/tmp-snap-test");
      createSnapshotTestsPreconditions(path);
      // Get the FileSystem instance that's being tested
      FileSystem fs = this.getHttpFSFileSystem();
      // Check FileStatus
      Assert.assertTrue(fs.getFileStatus(path).isSnapshotEnabled());
      // Create a file and take a snapshot
      Path file1 = new Path(path, "file1");
      testCreate(file1, false);
      fs.createSnapshot(path, "snap1");
      // Create another file and take a snapshot
      Path file2 = new Path(path, "file2");
      testCreate(file2, false);
      fs.createSnapshot(path, "snap2");
      // Get snapshot diff listing
      try {
        SnapshotDiffReportListing diffReportListing = null;
        byte[] emptyBytes = new byte[] {};
        if (fs instanceof HttpFSFileSystem) {
          HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
          diffReportListing =
              httpFS.getSnapshotDiffReportListing(path, "snap1", "snap2", emptyBytes, -1);
        } else if (fs instanceof WebHdfsFileSystem) {
          WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
          diffReportListing = webHdfsFileSystem
              .getSnapshotDiffReportListing(path.toUri().getPath(), "snap1", "snap2", emptyBytes,
                  -1);
        } else {
          Assert.fail(fs.getClass().getSimpleName() + " doesn't support getSnapshotDiff");
        }
        // Verify result with DFS
        DistributedFileSystem dfs =
            (DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
        SnapshotDiffReportListing dfsDiffReportListing =
            dfs.getSnapshotDiffReportListing(path, "snap1", "snap2",
                DFSUtil.bytes2String(emptyBytes), -1);
        assertHttpFsReportListingWithDfsClient(diffReportListing, dfsDiffReportListing);
      } finally {
        // Cleanup
        fs.deleteSnapshot(path, "snap2");
        fs.deleteSnapshot(path, "snap1");
        fs.delete(path, true);
      }
    }
  }

  private void testGetFileLinkStatus() throws Exception {
    if (isLocalFS()) {
      // do not test the symlink for local FS.
      return;
    }
    FileSystem fs = FileSystem.get(getProxiedFSConf());

    Path root = new Path(getProxiedFSTestDir(), "httpFSTest");
    Path file = new Path(root, "file");
    Path linkToFile = new Path(root, "linkToFile");

    OutputStream os = fs.create(file);
    os.write(1);
    fs.createSymlink(file, linkToFile, false);

    fs = this.getHttpFSFileSystem();

    assertFalse(fs.getFileLinkStatus(file).isSymlink());
    assertTrue(fs.getFileLinkStatus(linkToFile).isSymlink());
  }

  private void testGetStatus() throws Exception {
    if (isLocalFS()) {
      // do not test the getStatus for local FS.
      return;
    }
    final Path path = new Path("/foo");
    FileSystem fs = FileSystem.get(path.toUri(), this.getProxiedFSConf());
    if (fs instanceof DistributedFileSystem) {
      DistributedFileSystem dfs =
          (DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
      FileSystem httpFs = this.getHttpFSFileSystem();

      FsStatus dfsFsStatus = dfs.getStatus(path);
      FsStatus httpFsStatus = httpFs.getStatus(path);

      //Validate used free and capacity are the same as DistributedFileSystem
      assertEquals(dfsFsStatus.getUsed(), httpFsStatus.getUsed());
      assertEquals(dfsFsStatus.getRemaining(), httpFsStatus.getRemaining());
      assertEquals(dfsFsStatus.getCapacity(), httpFsStatus.getCapacity());
      httpFs.close();
      dfs.close();
    } else {
      Assert.fail(fs.getClass().getSimpleName() + " is not of type DistributedFileSystem.");
    }
  }

  private void testGetAllEEPolicies() throws Exception {
    if (isLocalFS()) {
      // do not test the getAllEEPolicies for local FS.
      return;
    }
    final Path path = new Path("/foo");
    FileSystem fs = FileSystem.get(path.toUri(), this.getProxiedFSConf());
    if (fs instanceof DistributedFileSystem) {
      DistributedFileSystem dfs =
          (DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
      FileSystem httpFs = this.getHttpFSFileSystem();

      Collection<ErasureCodingPolicyInfo> dfsAllErasureCodingPolicies =
          dfs.getAllErasureCodingPolicies();
      Collection<ErasureCodingPolicyInfo> diffErasureCodingPolicies = null;

      if (httpFs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) httpFs;
        diffErasureCodingPolicies = httpFS.getAllErasureCodingPolicies();
      } else if (httpFs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) httpFs;
        diffErasureCodingPolicies = webHdfsFileSystem.getAllErasureCodingPolicies();
      } else {
        Assert.fail(fs.getClass().getSimpleName() +
            " is not of type HttpFSFileSystem or WebHdfsFileSystem");
      }

      //Validate erasureCodingPolicyInfos are the same as DistributedFileSystem
      assertEquals(dfsAllErasureCodingPolicies.size(), diffErasureCodingPolicies.size());
      assertTrue(dfsAllErasureCodingPolicies.containsAll(diffErasureCodingPolicies));
    } else {
      Assert.fail(fs.getClass().getSimpleName() + " is not of type DistributedFileSystem.");
    }
  }

  private void testGetECCodecs() throws Exception {
    if (isLocalFS()) {
      // do not test the testGetECCodecs for local FS.
      return;
    }
    final Path path = new Path("/foo");

    FileSystem fs = FileSystem.get(path.toUri(), this.getProxiedFSConf());
    LambdaTestUtils.intercept(AssertionError.class, () -> {
      if (!(fs instanceof DistributedFileSystem)) {
        throw new AssertionError(fs.getClass().getSimpleName() +
            " is not of type DistributedFileSystem.");
      }
    });

    DistributedFileSystem dfs =
        (DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
    FileSystem httpFs = this.getHttpFSFileSystem();

    Map<String, String> dfsErasureCodingCodecs = dfs.getAllErasureCodingCodecs();

    final AtomicReference<Map<String, String>> diffErasureCodingCodecsRef =
        new AtomicReference<>();
    LambdaTestUtils.intercept(AssertionError.class, () -> {
      if (httpFs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFSFileSystem = (HttpFSFileSystem) httpFs;
        diffErasureCodingCodecsRef.set(httpFSFileSystem.getAllErasureCodingCodecs());
      } else if (httpFs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) httpFs;
        diffErasureCodingCodecsRef.set(webHdfsFileSystem.getAllErasureCodingCodecs());
      } else {
        throw new AssertionError(httpFs.getClass().getSimpleName() +
            " is not of type HttpFSFileSystem or WebHdfsFileSystem");
      }
    });
    Map<String, String> diffErasureCodingCodecs = diffErasureCodingCodecsRef.get();

    //Validate testGetECCodecs are the same as DistributedFileSystem
    Assert.assertEquals(dfsErasureCodingCodecs.size(), diffErasureCodingCodecs.size());

    for (Map.Entry<String, String> entry : dfsErasureCodingCodecs.entrySet()) {
      String key = entry.getKey();
      String value = entry.getValue();
      Assert.assertTrue(diffErasureCodingCodecs.containsKey(key));
      Assert.assertEquals(value, diffErasureCodingCodecs.get(key));
    }
  }

  private void testGetTrashRoots() throws Exception {
    if (isLocalFS()) {
      // do not test the getAllEEPolicies for local FS.
      return;
    }
    final Path path = new Path("/");
    FileSystem fs = FileSystem.get(path.toUri(), this.getProxiedFSConf());
    if (fs instanceof DistributedFileSystem) {
      DistributedFileSystem dfs =
          (DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
      FileSystem httpFs = this.getHttpFSFileSystem();

      // Create trash root for user0
      UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user0");
      String user0HomeStr = DFSUtilClient.getHomeDirectory(this.getProxiedFSConf(), ugi);
      Path user0Trash = new Path(user0HomeStr, FileSystem.TRASH_PREFIX);
      dfs.mkdirs(user0Trash);

      Collection<FileStatus> dfsTrashRoots = dfs.getTrashRoots(true);
      Collection<FileStatus> diffTrashRoots = null;

      if (httpFs instanceof HttpFSFileSystem) {
        HttpFSFileSystem httpFS = (HttpFSFileSystem) httpFs;
        diffTrashRoots = httpFS.getTrashRoots(true);
      } else if (httpFs instanceof WebHdfsFileSystem) {
        WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) httpFs;
        diffTrashRoots = webHdfsFileSystem.getTrashRoots(true);
      } else {
        Assert.fail(fs.getClass().getSimpleName() +
            " is not of type HttpFSFileSystem or WebHdfsFileSystem");
      }

      // Validate getTrashRoots are the same as DistributedFileSystem
      assertEquals(dfsTrashRoots.size(), diffTrashRoots.size());
    } else {
      Assert.fail(fs.getClass().getSimpleName() + " is not of type DistributedFileSystem.");
    }
  }

  private void assertHttpFsReportListingWithDfsClient(SnapshotDiffReportListing diffReportListing,
      SnapshotDiffReportListing dfsDiffReportListing) {
    Assert.assertEquals(diffReportListing.getCreateList().size(),
        dfsDiffReportListing.getCreateList().size());
    Assert.assertEquals(diffReportListing.getDeleteList().size(),
        dfsDiffReportListing.getDeleteList().size());
    Assert.assertEquals(diffReportListing.getModifyList().size(),
        dfsDiffReportListing.getModifyList().size());
    Assert.assertEquals(diffReportListing.getIsFromEarlier(),
        dfsDiffReportListing.getIsFromEarlier());
    Assert.assertEquals(diffReportListing.getLastIndex(), dfsDiffReportListing.getLastIndex());
    Assert.assertEquals(DFSUtil.bytes2String(diffReportListing.getLastPath()),
        DFSUtil.bytes2String(dfsDiffReportListing.getLastPath()));
    int i = 0;
    for (SnapshotDiffReportListing.DiffReportListingEntry entry : diffReportListing
        .getCreateList()) {
      SnapshotDiffReportListing.DiffReportListingEntry dfsDiffEntry =
          dfsDiffReportListing.getCreateList().get(i);
      Assert.assertEquals(entry.getDirId(), dfsDiffEntry.getDirId());
      Assert.assertEquals(entry.getFileId(), dfsDiffEntry.getFileId());
      Assert.assertArrayEquals(DFSUtilClient.byteArray2bytes(entry.getSourcePath()),
          DFSUtilClient.byteArray2bytes(dfsDiffEntry.getSourcePath()));
      i++;
    }
    i = 0;
    for (SnapshotDiffReportListing.DiffReportListingEntry entry : diffReportListing
        .getDeleteList()) {
      SnapshotDiffReportListing.DiffReportListingEntry dfsDiffEntry =
          dfsDiffReportListing.getDeleteList().get(i);
      Assert.assertEquals(entry.getDirId(), dfsDiffEntry.getDirId());
      Assert.assertEquals(entry.getFileId(), dfsDiffEntry.getFileId());
      Assert.assertArrayEquals(DFSUtilClient.byteArray2bytes(entry.getSourcePath()),
          DFSUtilClient.byteArray2bytes(dfsDiffEntry.getSourcePath()));
      i++;
    }
    i = 0;
    for (SnapshotDiffReportListing.DiffReportListingEntry entry : diffReportListing
        .getModifyList()) {
      SnapshotDiffReportListing.DiffReportListingEntry dfsDiffEntry =
          dfsDiffReportListing.getModifyList().get(i);
      Assert.assertEquals(entry.getDirId(), dfsDiffEntry.getDirId());
      Assert.assertEquals(entry.getFileId(), dfsDiffEntry.getFileId());
      Assert.assertArrayEquals(DFSUtilClient.byteArray2bytes(entry.getSourcePath()),
          DFSUtilClient.byteArray2bytes(dfsDiffEntry.getSourcePath()));
      i++;
    }
  }

}