TestWebHDFS.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.web;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.TestDistributedFileSystem.checkOpStatistics;
import static org.apache.hadoop.hdfs.TestDistributedFileSystem.checkStatistics;
import static org.apache.hadoop.hdfs.TestDistributedFileSystem.getOpStatistics;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;

import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
import org.apache.hadoop.test.LambdaTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.test.Whitebox;
import org.slf4j.event.Level;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/** Test WebHDFS */
public class TestWebHDFS {
  static final Logger LOG = LoggerFactory.getLogger(TestWebHDFS.class);
  
  static final Random RANDOM = new Random();
  
  static final long systemStartTime = System.nanoTime();

  private static MiniDFSCluster cluster = null;

  @After
  public void tearDown() {
    if (null != cluster) {
      cluster.shutdown();
      cluster = null;
    }
  }

  /** A timer for measuring performance. */
  static class Ticker {
    final String name;
    final long startTime = System.nanoTime();
    private long previousTick = startTime;

    Ticker(final String name, String format, Object... args) {
      this.name = name;
      LOG.info(String.format("\n\n%s START: %s\n",
          name, String.format(format, args)));
    }

    void tick(final long nBytes, String format, Object... args) {
      final long now = System.nanoTime();
      if (now - previousTick > 10000000000L) {
        previousTick = now;
        final double mintues = (now - systemStartTime)/60000000000.0;
        LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues,
            String.format(format, args), toMpsString(nBytes, now)));
      }
    }
    
    void end(final long nBytes) {
      final long now = System.nanoTime();
      final double seconds = (now - startTime)/1000000000.0;
      LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n",
          name, seconds, toMpsString(nBytes, now)));
    }
    
    String toMpsString(final long nBytes, final long now) {
      final double mb = nBytes/(double)(1<<20);
      final double mps = mb*1000000000.0/(now - startTime);
      return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps);
    }
  }

  @Test(timeout=300000)
  public void testLargeFile() throws Exception {
    largeFileTest(200L << 20); //200MB file length
  }

  /** Test read and write large files. */
  static void largeFileTest(final long fileLength) throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();

    cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(3)
        .build();
    cluster.waitActive();

    final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final Path dir = new Path("/test/largeFile");
    Assert.assertTrue(fs.mkdirs(dir));

    final byte[] data = new byte[1 << 20];
    RANDOM.nextBytes(data);

    final byte[] expected = new byte[2 * data.length];
    System.arraycopy(data, 0, expected, 0, data.length);
    System.arraycopy(data, 0, expected, data.length, data.length);

    final Path p = new Path(dir, "file");
    final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength);
    final FSDataOutputStream out = fs.create(p);
    try {
      long remaining = fileLength;
      for (; remaining > 0;) {
        t.tick(fileLength - remaining, "remaining=%d", remaining);

        final int n = (int) Math.min(remaining, data.length);
        out.write(data, 0, n);
        remaining -= n;
      }
    } finally {
      out.close();
    }
    t.end(fileLength);

    Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen());

    final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20);
    final long largeOffset = fileLength - smallOffset;
    final byte[] buf = new byte[data.length];

    verifySeek(fs, p, largeOffset, fileLength, buf, expected);
    verifySeek(fs, p, smallOffset, fileLength, buf, expected);

    verifyPread(fs, p, largeOffset, fileLength, buf, expected);
  }

  static void checkData(long offset, long remaining, int n,
      byte[] actual, byte[] expected) {
    if (RANDOM.nextInt(100) == 0) {
      int j = (int)(offset % actual.length);
      for(int i = 0; i < n; i++) {
        if (expected[j] != actual[i]) {
          Assert.fail("expected[" + j + "]=" + expected[j]
              + " != actual[" + i + "]=" + actual[i]
              + ", offset=" + offset + ", remaining=" + remaining + ", n=" + n);
        }
        j++;
      }
    }
  }

  /** test seek */
  static void verifySeek(FileSystem fs, Path p, long offset, long length,
      byte[] buf, byte[] expected) throws IOException { 
    long remaining = length - offset;
    long checked = 0;
    LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);

    final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
        offset, remaining);
    final FSDataInputStream in = fs.open(p, 64 << 10);
    in.seek(offset);
    for(; remaining > 0; ) {
      t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
      final int n = (int)Math.min(remaining, buf.length);
      in.readFully(buf, 0, n);
      checkData(offset, remaining, n, buf, expected);

      offset += n;
      remaining -= n;
      checked += n;
    }
    in.close();
    t.end(checked);
  }

  static void verifyPread(FileSystem fs, Path p, long offset, long length,
      byte[] buf, byte[] expected) throws IOException {
    long remaining = length - offset;
    long checked = 0;
    LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining);

    final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d",
        offset, remaining);
    final FSDataInputStream in = fs.open(p, 64 << 10);
    for(; remaining > 0; ) {
      t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
      final int n = (int)Math.min(remaining, buf.length);
      in.readFully(offset, buf, 0, n);
      checkData(offset, remaining, n, buf, expected);

      offset += n;
      remaining -= n;
      checked += n;
    }
    in.close();
    t.end(checked);
  }

  /** Test client retry with namenode restarting. */
  @Test(timeout=300000)
  public void testNamenodeRestart() throws Exception {
    GenericTestUtils.setLogLevel(NamenodeWebHdfsMethods.LOG, Level.TRACE);
    final Configuration conf = WebHdfsTestUtil.createConf();
    TestDFSClientRetries.namenodeRestartTest(conf, true);
  }
  
  @Test(timeout=300000)
  public void testLargeDirectory() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final int listLimit = 2;
    // force small chunking of directory listing
    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, listLimit);
    // force paths to be only owner-accessible to ensure ugi isn't changing
    // during listStatus
    FsPermission.setUMask(conf, new FsPermission((short)0077));
    
    cluster =
        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    cluster.waitActive();
    WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME)
        .setPermission(new Path("/"),
            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));

    // trick the NN into not believing it's not the superuser so we can
    // tell if the correct user is used by listStatus
    UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting(
        "not-superuser", new String[] {"not-supergroup" }));

    UserGroupInformation.createUserForTesting("me", new String[] {"my-group" })
        .doAs(new PrivilegedExceptionAction<Void>() {
          @Override
          public Void run() throws IOException, URISyntaxException {
            FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
                WebHdfsConstants.WEBHDFS_SCHEME);
            Path d = new Path("/my-dir");
            Assert.assertTrue(fs.mkdirs(d));
            // Iterator should have no items when dir is empty
            RemoteIterator<FileStatus> it = fs.listStatusIterator(d);
            assertFalse(it.hasNext());
            Path p = new Path(d, "file-" + 0);
            Assert.assertTrue(fs.createNewFile(p));
            // Iterator should have an item when dir is not empty
            it = fs.listStatusIterator(d);
            assertTrue(it.hasNext());
            it.next();
            assertFalse(it.hasNext());
            for (int i = 1; i < listLimit * 3; i++) {
              p = new Path(d, "file-" + i);
              Assert.assertTrue(fs.createNewFile(p));
            }
            // Check the FileStatus[] listing
            FileStatus[] statuses = fs.listStatus(d);
            Assert.assertEquals(listLimit * 3, statuses.length);
            // Check the iterator-based listing
            GenericTestUtils.setLogLevel(WebHdfsFileSystem.LOG, Level.TRACE);
            GenericTestUtils.setLogLevel(NamenodeWebHdfsMethods.LOG,
                Level.TRACE);
            it = fs.listStatusIterator(d);
            int count = 0;
            while (it.hasNext()) {
              FileStatus stat = it.next();
              assertEquals("FileStatuses not equal", statuses[count], stat);
              count++;
            }
            assertEquals("Different # of statuses!", statuses.length, count);
            // Do some more basic iterator tests
            it = fs.listStatusIterator(d);
            // Try advancing the iterator without calling hasNext()
            for (int i = 0; i < statuses.length; i++) {
              FileStatus stat = it.next();
              assertEquals("FileStatuses not equal", statuses[i], stat);
            }
            assertFalse("No more items expected", it.hasNext());
            // Try doing next when out of items
            try {
              it.next();
              fail("Iterator should error if out of elements.");
            } catch (NoSuchElementException e) {
              // pass
            }
            return null;
          }
        });
  }

  /**
   * Test client receives correct DSQuotaExceededException.
   */
  @Test
  public void testExceedingFileSpaceQuota() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    long spaceQuota = 50L << 20;
    long fileLength = 80L << 20;

    cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(3)
        .build();

    cluster.waitActive();

    final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final Path dir = new Path("/test/largeFile");
    assertTrue(fs.mkdirs(dir));

    final byte[] data = new byte[1 << 20];
    RANDOM.nextBytes(data);

    cluster.getFileSystem().setQuota(dir, HdfsConstants.QUOTA_DONT_SET,
        spaceQuota);

    final Path p = new Path(dir, "file");

    FSDataOutputStream out = fs.create(p);
    try {
      for (long remaining = fileLength; remaining > 0;) {
        final int n = (int) Math.min(remaining, data.length);
        out.write(data, 0, n);
        remaining -= n;
      }
      fail("should have thrown exception during the write");
    } catch (DSQuotaExceededException e) {
      // expected
    } finally {
      try {
        out.close();
      } catch (Exception e) {
        // discard exception from close
      }
    }
  }

  @Test(timeout=300000)
  public void testCustomizedUserAndGroupNames() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
    // Modify username pattern to allow numeric usernames
    conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, "^[A-Za-z0-9_][A-Za-z0-9" +
        "._-]*[$]?$");
    // Modify acl pattern to allow numeric and "@" characters user/groups in ACL spec
    conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_KEY,
        "^(default:)?(user|group|mask|other):" +
            "[[0-9A-Za-z_][@A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?" +
            "(user|group|mask|other):[[0-9A-Za-z_][@A-Za-z0-9._-]]*:([rwx-]{3})?)*$");
    cluster =
        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    cluster.waitActive();
    WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME)
        .setPermission(new Path("/"),
            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));

    // Test a numeric username
    UserGroupInformation
        .createUserForTesting("123", new String[] {"my-group" })
        .doAs(new PrivilegedExceptionAction<Void>() {
          @Override
          public Void run() throws IOException, URISyntaxException {
            FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
                WebHdfsConstants.WEBHDFS_SCHEME);
            Path d = new Path("/my-dir");
            Assert.assertTrue(fs.mkdirs(d));
            // Test also specifying a default ACL with a numeric username
            // and another of a groupname with '@'
            fs.modifyAclEntries(d, ImmutableList.of(new AclEntry.Builder()
                .setPermission(FsAction.READ).setScope(AclEntryScope.DEFAULT)
                .setType(AclEntryType.USER).setName("11010").build(),
                new AclEntry.Builder().setPermission(FsAction.READ_WRITE)
                    .setType(AclEntryType.GROUP).setName("foo@bar").build()));
            return null;
          }
        });
  }

  /**
   * Test for catching "no datanode" IOException, when to create a file
   * but datanode is not running for some reason.
   */
  @Test(timeout=300000)
  public void testCreateWithNoDN() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
      conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
      cluster.waitActive();
      FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
          WebHdfsConstants.WEBHDFS_SCHEME);
      fs.create(new Path("/testnodatanode"));
      Assert.fail("No exception was thrown");
    } catch (IOException ex) {
      GenericTestUtils.assertExceptionContains("Failed to find datanode", ex);
    }
  }

  @Test
  public void testWebHdfsCreateWithInvalidPath() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    // A path name include duplicated slashes.
    String path = "//tmp//file";
    assertResponse(path);
  }

  private String getUri(String path) {
    final String user = System.getProperty("user.name");
    final StringBuilder uri = new StringBuilder(cluster.getHttpUri(0));
    uri.append("/webhdfs/v1").
        append(path).
        append("?op=CREATE").
        append("&user.name=" + user);
    return uri.toString();
  }

  private void assertResponse(String path) throws IOException {
    URL url = new URL(getUri(path));
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    conn.setRequestMethod("PUT");
    // Assert response code.
    assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
    // Assert exception.
    Map<?, ?> response = WebHdfsFileSystem.jsonParse(conn, true);
    assertEquals("InvalidPathException",
        ((LinkedHashMap) response.get("RemoteException")).get("exception"));
    conn.disconnect();
  }

  /**
   * Test allow and disallow snapshot through WebHdfs. Verifying webhdfs with
   * Distributed filesystem methods.
   */
  @Test
  public void testWebHdfsAllowandDisallowSnapshots() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    final Path bar = new Path("/bar");
    dfs.mkdirs(bar);

    // allow snapshots on /bar using webhdfs
    webHdfs.allowSnapshot(bar);
    // check if snapshot status is enabled
    assertTrue(dfs.getFileStatus(bar).isSnapshotEnabled());
    assertTrue(webHdfs.getFileStatus(bar).isSnapshotEnabled());
    webHdfs.createSnapshot(bar, "s1");
    final Path s1path = SnapshotTestHelper.getSnapshotRoot(bar, "s1");
    Assert.assertTrue(webHdfs.exists(s1path));
    SnapshottableDirectoryStatus[] snapshottableDirs =
        dfs.getSnapshottableDirListing();
    assertEquals(1, snapshottableDirs.length);
    assertEquals(bar, snapshottableDirs[0].getFullPath());
    dfs.deleteSnapshot(bar, "s1");
    dfs.disallowSnapshot(bar);
    // check if snapshot status is disabled
    assertFalse(dfs.getFileStatus(bar).isSnapshotEnabled());
    assertFalse(webHdfs.getFileStatus(bar).isSnapshotEnabled());
    snapshottableDirs = dfs.getSnapshottableDirListing();
    assertNull(snapshottableDirs);

    // disallow snapshots on /bar using webhdfs
    dfs.allowSnapshot(bar);
    // check if snapshot status is enabled, again
    assertTrue(dfs.getFileStatus(bar).isSnapshotEnabled());
    assertTrue(webHdfs.getFileStatus(bar).isSnapshotEnabled());
    snapshottableDirs = dfs.getSnapshottableDirListing();
    assertEquals(1, snapshottableDirs.length);
    assertEquals(bar, snapshottableDirs[0].getFullPath());
    webHdfs.disallowSnapshot(bar);
    // check if snapshot status is disabled, again
    assertFalse(dfs.getFileStatus(bar).isSnapshotEnabled());
    assertFalse(webHdfs.getFileStatus(bar).isSnapshotEnabled());
    snapshottableDirs = dfs.getSnapshottableDirListing();
    assertNull(snapshottableDirs);
    try {
      webHdfs.createSnapshot(bar);
      fail("Cannot create snapshot on a non-snapshottable directory");
    } catch (Exception e) {
      GenericTestUtils.assertExceptionContains(
          "Directory is not a snapshottable directory", e);
    }
  }

  @Test (timeout = 60000)
  public void testWebHdfsErasureCodingFiles() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    dfs.enableErasureCodingPolicy(SystemErasureCodingPolicies
        .getByID(SystemErasureCodingPolicies.XOR_2_1_POLICY_ID).getName());
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    final Path ecDir = new Path("/ec");
    dfs.mkdirs(ecDir);
    dfs.setErasureCodingPolicy(ecDir, SystemErasureCodingPolicies
        .getByID(SystemErasureCodingPolicies.XOR_2_1_POLICY_ID).getName());
    final Path ecFile = new Path(ecDir, "ec-file.log");
    DFSTestUtil.createFile(dfs, ecFile, 1024 * 10, (short) 1, 0xFEED);

    final Path normalDir = new Path("/dir");
    dfs.mkdirs(normalDir);
    final Path normalFile = new Path(normalDir, "file.log");
    DFSTestUtil.createFile(dfs, normalFile, 1024 * 10, (short) 1, 0xFEED);

    FileStatus expectedECDirStatus = dfs.getFileStatus(ecDir);
    FileStatus actualECDirStatus = webHdfs.getFileStatus(ecDir);
    Assert.assertEquals(expectedECDirStatus.isErasureCoded(),
        actualECDirStatus.isErasureCoded());
    ContractTestUtils.assertErasureCoded(dfs, ecDir);
    assertTrue(
        ecDir + " should have erasure coding set in "
            + "FileStatus#toString(): " + actualECDirStatus,
        actualECDirStatus.toString().contains("isErasureCoded=true"));

    FileStatus expectedECFileStatus = dfs.getFileStatus(ecFile);
    FileStatus actualECFileStatus = webHdfs.getFileStatus(ecFile);
    Assert.assertEquals(expectedECFileStatus.isErasureCoded(),
        actualECFileStatus.isErasureCoded());
    ContractTestUtils.assertErasureCoded(dfs, ecFile);
    assertTrue(
        ecFile + " should have erasure coding set in "
            + "FileStatus#toString(): " + actualECFileStatus,
        actualECFileStatus.toString().contains("isErasureCoded=true"));

    FileStatus expectedNormalDirStatus = dfs.getFileStatus(normalDir);
    FileStatus actualNormalDirStatus = webHdfs.getFileStatus(normalDir);
    Assert.assertEquals(expectedNormalDirStatus.isErasureCoded(),
        actualNormalDirStatus.isErasureCoded());
    ContractTestUtils.assertNotErasureCoded(dfs, normalDir);
    assertTrue(
        normalDir + " should have erasure coding unset in "
            + "FileStatus#toString(): " + actualNormalDirStatus,
        actualNormalDirStatus.toString().contains("isErasureCoded=false"));

    FileStatus expectedNormalFileStatus = dfs.getFileStatus(normalFile);
    FileStatus actualNormalFileStatus = webHdfs.getFileStatus(normalDir);
    Assert.assertEquals(expectedNormalFileStatus.isErasureCoded(),
        actualNormalFileStatus.isErasureCoded());
    ContractTestUtils.assertNotErasureCoded(dfs, normalFile);
    assertTrue(
        normalFile + " should have erasure coding unset in "
            + "FileStatus#toString(): " + actualNormalFileStatus,
        actualNormalFileStatus.toString().contains("isErasureCoded=false"));
  }

  /**
   * Test snapshot creation through WebHdfs.
   */
  @Test
  public void testWebHdfsCreateSnapshot() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    final Path foo = new Path("/foo");
    dfs.mkdirs(foo);

    try {
      webHdfs.createSnapshot(foo);
      fail("Cannot create snapshot on a non-snapshottable directory");
    } catch (Exception e) {
      GenericTestUtils.assertExceptionContains(
          "Directory is not a snapshottable directory", e);
    }

    // allow snapshots on /foo
    dfs.allowSnapshot(foo);
    // create snapshots on foo using WebHdfs
    webHdfs.createSnapshot(foo, "s1");
    // create snapshot without specifying name
    final Path spath = webHdfs.createSnapshot(foo, null);

    Assert.assertTrue(webHdfs.exists(spath));
    final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
    Assert.assertTrue(webHdfs.exists(s1path));
  }

  /**
   * Test snapshot deletion through WebHdfs.
   */
  @Test
  public void testWebHdfsDeleteSnapshot() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    final Path foo = new Path("/foo");
    dfs.mkdirs(foo);
    dfs.allowSnapshot(foo);

    webHdfs.createSnapshot(foo, "s1");
    final Path spath = webHdfs.createSnapshot(foo, null);
    Assert.assertTrue(webHdfs.exists(spath));
    final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
    Assert.assertTrue(webHdfs.exists(s1path));

    // delete operation snapshot name as null
    try {
      webHdfs.deleteSnapshot(foo, null);
      fail("Expected IllegalArgumentException");
    } catch (RemoteException e) {
      Assert.assertEquals("Required param snapshotname for "
          + "op: DELETESNAPSHOT is null or empty", e.getLocalizedMessage());
    }

    // delete the two snapshots
    webHdfs.deleteSnapshot(foo, "s1");
    assertFalse(webHdfs.exists(s1path));
    webHdfs.deleteSnapshot(foo, spath.getName());
    assertFalse(webHdfs.exists(spath));
  }

  /**
   * Test snapshot diff through WebHdfs.
   */
  @Test
  public void testWebHdfsSnapshotDiff() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    conf.setInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT, 2);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    final Path foo = new Path("/foo");
    dfs.mkdirs(foo);
    Path file0 = new Path(foo, "file0");
    DFSTestUtil.createFile(dfs, file0, 100, (short) 1, 0);
    Path file1 = new Path(foo, "file1");
    DFSTestUtil.createFile(dfs, file1, 100, (short) 1, 0);
    Path file2 = new Path(foo, "file2");
    DFSTestUtil.createFile(dfs, file2, 100, (short) 1, 0);

    dfs.allowSnapshot(foo);
    webHdfs.createSnapshot(foo, "s1");
    final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
    Assert.assertTrue(webHdfs.exists(s1path));

    Path file3 = new Path(foo, "file3");
    DFSTestUtil.createFile(dfs, file3, 100, (short) 1, 0);
    DFSTestUtil.appendFile(dfs, file0, 100);
    dfs.delete(file1, false);
    Path file4 = new Path(foo, "file4");
    dfs.rename(file2, file4);

    webHdfs.createSnapshot(foo, "s2");
    SnapshotDiffReport diffReport =
        webHdfs.getSnapshotDiffReport(foo, "s1", "s2");

    Assert.assertEquals("/foo", diffReport.getSnapshotRoot());
    Assert.assertEquals("s1", diffReport.getFromSnapshot());
    Assert.assertEquals("s2", diffReport.getLaterSnapshotName());
    DiffReportEntry entry0 =
        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes(""));
    DiffReportEntry entry1 =
        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("file0"));
    DiffReportEntry entry2 =
        new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("file1"));
    DiffReportEntry entry3 = new DiffReportEntry(DiffType.RENAME,
        DFSUtil.string2Bytes("file2"), DFSUtil.string2Bytes("file4"));
    DiffReportEntry entry4 =
        new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes("file3"));
    Assert.assertTrue(diffReport.getDiffList().contains(entry0));
    Assert.assertTrue(diffReport.getDiffList().contains(entry1));
    Assert.assertTrue(diffReport.getDiffList().contains(entry2));
    Assert.assertTrue(diffReport.getDiffList().contains(entry3));
    Assert.assertTrue(diffReport.getDiffList().contains(entry4));
    Assert.assertEquals(diffReport.getDiffList().size(), 5);

    // Test with fromSnapshot and toSnapshot as null.
    diffReport = webHdfs.getSnapshotDiffReport(foo, null, "s2");
    Assert.assertEquals(diffReport.getDiffList().size(), 0);
    diffReport = webHdfs.getSnapshotDiffReport(foo, "s1", null);
    Assert.assertEquals(diffReport.getDiffList().size(), 5);
  }

  /**
   * Test snapshottable directory list through WebHdfs.
   */
  @Test
  public void testWebHdfsSnapshottableDirectoryList() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final Path foo = new Path("/foo");
    final Path bar = new Path("/bar");
    dfs.mkdirs(foo);
    dfs.mkdirs(bar);
    SnapshottableDirectoryStatus[] statuses =
        webHdfs.getSnapshottableDirectoryList();
    Assert.assertNull(statuses);
    dfs.allowSnapshot(foo);
    dfs.allowSnapshot(bar);
    Path file0 = new Path(foo, "file0");
    DFSTestUtil.createFile(dfs, file0, 100, (short) 1, 0);
    Path file1 = new Path(bar, "file1");
    DFSTestUtil.createFile(dfs, file1, 100, (short) 1, 0);
    statuses = webHdfs.getSnapshottableDirectoryList();
    SnapshottableDirectoryStatus[] dfsStatuses =
        dfs.getSnapshottableDirListing();

    for (int i = 0; i < dfsStatuses.length; i++) {
      Assert.assertEquals(statuses[i].getSnapshotNumber(),
          dfsStatuses[i].getSnapshotNumber());
      Assert.assertEquals(statuses[i].getSnapshotQuota(),
          dfsStatuses[i].getSnapshotQuota());
      Assert.assertTrue(Arrays.equals(statuses[i].getParentFullPath(),
          dfsStatuses[i].getParentFullPath()));
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getChildrenNum(),
          statuses[i].getDirStatus().getChildrenNum());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getModificationTime(),
          statuses[i].getDirStatus().getModificationTime());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().isDir(),
          statuses[i].getDirStatus().isDir());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getAccessTime(),
          statuses[i].getDirStatus().getAccessTime());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getPermission(),
          statuses[i].getDirStatus().getPermission());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getOwner(),
          statuses[i].getDirStatus().getOwner());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getGroup(),
          statuses[i].getDirStatus().getGroup());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getPath(),
          statuses[i].getDirStatus().getPath());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().getFileId(),
          statuses[i].getDirStatus().getFileId());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().hasAcl(),
          statuses[i].getDirStatus().hasAcl());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().isEncrypted(),
          statuses[i].getDirStatus().isEncrypted());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().isErasureCoded(),
          statuses[i].getDirStatus().isErasureCoded());
      Assert.assertEquals(dfsStatuses[i].getDirStatus().isSnapshotEnabled(),
          statuses[i].getDirStatus().isSnapshotEnabled());
    }
  }

  @Test
  public void testWebHdfsSnapshotList() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
      cluster.waitActive();
      final DistributedFileSystem dfs = cluster.getFileSystem();
      final WebHdfsFileSystem webHdfs = WebHdfsTestUtil
          .getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
      final Path foo = new Path("/foo");
      dfs.mkdirs(foo);
      dfs.allowSnapshot(foo);
      webHdfs.createSnapshot(foo, "s1");
      webHdfs.createSnapshot(foo, "s2");
      webHdfs.deleteSnapshot(foo, "s2");
      SnapshotStatus[] statuses = webHdfs.getSnapshotListing(foo);
      SnapshotStatus[] dfsStatuses = dfs.getSnapshotListing(foo);

      for (int i = 0; i < dfsStatuses.length; i++) {
        Assert.assertEquals(statuses[i].getSnapshotID(),
            dfsStatuses[i].getSnapshotID());
        Assert.assertEquals(statuses[i].isDeleted(),
            dfsStatuses[i].isDeleted());
        Assert.assertTrue(Arrays.equals(statuses[i].getParentFullPath(),
            dfsStatuses[i].getParentFullPath()));
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getChildrenNum(),
            statuses[i].getDirStatus().getChildrenNum());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getModificationTime(),
            statuses[i].getDirStatus().getModificationTime());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().isDir(),
            statuses[i].getDirStatus().isDir());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getAccessTime(),
            statuses[i].getDirStatus().getAccessTime());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getPermission(),
            statuses[i].getDirStatus().getPermission());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getOwner(),
            statuses[i].getDirStatus().getOwner());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getGroup(),
            statuses[i].getDirStatus().getGroup());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getPath(),
            statuses[i].getDirStatus().getPath());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().getFileId(),
            statuses[i].getDirStatus().getFileId());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().hasAcl(),
            statuses[i].getDirStatus().hasAcl());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().isEncrypted(),
            statuses[i].getDirStatus().isEncrypted());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().isErasureCoded(),
            statuses[i].getDirStatus().isErasureCoded());
        Assert.assertEquals(dfsStatuses[i].getDirStatus().isSnapshotEnabled(),
            statuses[i].getDirStatus().isSnapshotEnabled());
      }
    } finally {
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }


  @Test
  public void testWebHdfsCreateNonRecursive() throws IOException, URISyntaxException {
    final Configuration conf = WebHdfsTestUtil.createConf();
    WebHdfsFileSystem webHdfs = null;

    try {
      cluster = new MiniDFSCluster.Builder(conf).build();
      cluster.waitActive();

      webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);

      TestFileCreation.testFileCreationNonRecursive(webHdfs);

    } finally {
      if(webHdfs != null) {
       webHdfs.close();
      }
    }
  }
  /**
   * Test snapshot rename through WebHdfs.
   */
  @Test
  public void testWebHdfsRenameSnapshot() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    final Path foo = new Path("/foo");
    dfs.mkdirs(foo);
    dfs.allowSnapshot(foo);

    webHdfs.createSnapshot(foo, "s1");
    final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
    Assert.assertTrue(webHdfs.exists(s1path));

    // rename s1 to s2 with oldsnapshotName as null
    try {
      webHdfs.renameSnapshot(foo, null, "s2");
      fail("Expected IllegalArgumentException");
    } catch (RemoteException e) {
      Assert.assertEquals("Required param oldsnapshotname for "
          + "op: RENAMESNAPSHOT is null or empty", e.getLocalizedMessage());
    }

    // rename s1 to s2
    webHdfs.renameSnapshot(foo, "s1", "s2");
    assertFalse(webHdfs.exists(s1path));
    final Path s2path = SnapshotTestHelper.getSnapshotRoot(foo, "s2");
    Assert.assertTrue(webHdfs.exists(s2path));

    webHdfs.deleteSnapshot(foo, "s2");
    assertFalse(webHdfs.exists(s2path));
  }

  /**
   * Make sure a RetriableException is thrown when rpcServer is null in
   * NamenodeWebHdfsMethods.
   */
  @Test
  public void testRaceWhileNNStartup() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    cluster.waitActive();
    final NameNode namenode = cluster.getNameNode();
    final NamenodeProtocols rpcServer = namenode.getRpcServer();
    Whitebox.setInternalState(namenode, "rpcServer", null);

    final Path foo = new Path("/foo");
    final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    try {
      webHdfs.mkdirs(foo);
      fail("Expected RetriableException");
    } catch (RetriableException e) {
      GenericTestUtils.assertExceptionContains("Namenode is in startup mode",
          e);
    }
    Whitebox.setInternalState(namenode, "rpcServer", rpcServer);
  }

  @Test
  public void testDTInInsecureClusterWithFallback()
      throws IOException, URISyntaxException {
    final Configuration conf = WebHdfsTestUtil.createConf();
    conf.setBoolean(CommonConfigurationKeys
        .IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    Assert.assertNull(webHdfs.getDelegationToken(null));
  }

  @Test
  public void testDTInInsecureCluster() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
      final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
          WebHdfsConstants.WEBHDFS_SCHEME);
      webHdfs.getDelegationToken(null);
      fail("No exception is thrown.");
    } catch (AccessControlException ace) {
      Assert.assertTrue(ace.getMessage().startsWith(
          WebHdfsFileSystem.CANT_FALLBACK_TO_INSECURE_MSG));
    }
  }

  @Test
  public void testWebHdfsOffsetAndLength() throws Exception{
    final Configuration conf = WebHdfsTestUtil.createConf();
    final int OFFSET = 42;
    final int LENGTH = 512;
    final String PATH = "/foo";
    byte[] CONTENTS = new byte[1024];
    RANDOM.nextBytes(CONTENTS);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    try (OutputStream os = fs.create(new Path(PATH))) {
      os.write(CONTENTS);
    }
    InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
    URL url = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN"
            + Param.toSortedString("&", new OffsetParam((long) OFFSET),
                new LengthParam((long) LENGTH)));
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    conn.setInstanceFollowRedirects(true);
    Assert.assertEquals(LENGTH, conn.getContentLength());
    byte[] subContents = new byte[LENGTH];
    byte[] realContents = new byte[LENGTH];
    System.arraycopy(CONTENTS, OFFSET, subContents, 0, LENGTH);
    IOUtils.readFully(conn.getInputStream(), realContents);
    Assert.assertArrayEquals(subContents, realContents);
  }

  @Test
  public void testContentSummary() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final Path path = new Path("/QuotaDir");
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final DistributedFileSystem dfs = cluster.getFileSystem();
    dfs.mkdirs(path);
    dfs.setQuotaByStorageType(path, StorageType.DISK, 100000);
    ContentSummary contentSummary = webHdfs.getContentSummary(path);
    Assert
        .assertTrue((contentSummary.getTypeQuota(StorageType.DISK) == 100000));
  }

  /**
   * Test Snapshot related information in ContentSummary.
   */
  @Test
  public void testSnapshotInContentSummary() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    Path dirPath = new Path("/dir");
    final Path filePath = new Path("/dir/file");
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final DistributedFileSystem dfs = cluster.getFileSystem();
    DFSTestUtil.createFile(dfs, filePath, 10, (short) 3, 0L);
    dfs.allowSnapshot(dirPath);
    dfs.createSnapshot(dirPath);
    dfs.delete(filePath, true);
    ContentSummary contentSummary = webHdfs.getContentSummary(dirPath);
    assertEquals(1, contentSummary.getSnapshotFileCount());
    assertEquals(10, contentSummary.getSnapshotLength());
    assertEquals(30, contentSummary.getSnapshotSpaceConsumed());
    assertEquals(dfs.getContentSummary(dirPath),
        webHdfs.getContentSummary(dirPath));
  }

  @Test
  public void testQuotaUsage() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final Path path = new Path("/TestDir");
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final DistributedFileSystem dfs = cluster.getFileSystem();

    final long nsQuota = 100;
    final long spaceQuota = 600 * 1024 * 1024;
    final long diskQuota = 100000;
    final byte[] bytes = {0x0, 0x1, 0x2, 0x3 };

    dfs.mkdirs(path);
    dfs.setQuota(path, nsQuota, spaceQuota);
    for (int i = 0; i < 10; i++) {
      dfs.createNewFile(new Path(path, "test_file_" + i));
    }
    FSDataOutputStream out = dfs.create(new Path(path, "test_file"));
    out.write(bytes);
    out.close();

    dfs.setQuotaByStorageType(path, StorageType.DISK, diskQuota);

    QuotaUsage quotaUsage = webHdfs.getQuotaUsage(path);
    assertEquals(12, quotaUsage.getFileAndDirectoryCount());
    assertEquals(nsQuota, quotaUsage.getQuota());
    assertEquals(bytes.length * dfs.getDefaultReplication(),
        quotaUsage.getSpaceConsumed());
    assertEquals(spaceQuota, quotaUsage.getSpaceQuota());
    assertEquals(diskQuota, quotaUsage.getTypeQuota(StorageType.DISK));
  }

  @Test
  public void testSetQuota() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final Path path = new Path("/TestDir");
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final DistributedFileSystem dfs = cluster.getFileSystem();

    final long nsQuota = 100;
    final long spaceQuota = 1024;

    webHdfs.mkdirs(path);

    webHdfs.setQuota(path, nsQuota, spaceQuota);
    QuotaUsage quotaUsage = dfs.getQuotaUsage(path);
    assertEquals(nsQuota, quotaUsage.getQuota());
    assertEquals(spaceQuota, quotaUsage.getSpaceQuota());

    webHdfs.setQuota(path, HdfsConstants.QUOTA_RESET,
        HdfsConstants.QUOTA_RESET);
    quotaUsage = dfs.getQuotaUsage(path);
    assertEquals(HdfsConstants.QUOTA_RESET, quotaUsage.getQuota());
    assertEquals(HdfsConstants.QUOTA_RESET, quotaUsage.getSpaceQuota());

    webHdfs.setQuotaByStorageType(path, StorageType.DISK, spaceQuota);
    webHdfs.setQuotaByStorageType(path, StorageType.ARCHIVE, spaceQuota);
    webHdfs.setQuotaByStorageType(path, StorageType.SSD, spaceQuota);
    webHdfs.setQuotaByStorageType(path, StorageType.NVDIMM, spaceQuota);
    quotaUsage = dfs.getQuotaUsage(path);
    assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.DISK));
    assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.ARCHIVE));
    assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.SSD));
    assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.NVDIMM));

    // Test invalid parameters

    LambdaTestUtils.intercept(IllegalArgumentException.class,
        () -> webHdfs.setQuota(path, -100, 100));
    LambdaTestUtils.intercept(IllegalArgumentException.class,
        () -> webHdfs.setQuota(path, 100, -100));
    LambdaTestUtils.intercept(IllegalArgumentException.class,
        () -> webHdfs.setQuotaByStorageType(path, StorageType.SSD, -100));
    LambdaTestUtils.intercept(IllegalArgumentException.class,
        () -> webHdfs.setQuotaByStorageType(path, null, 100));
    LambdaTestUtils.intercept(IllegalArgumentException.class,
        () -> webHdfs.setQuotaByStorageType(path, StorageType.SSD, -100));
    LambdaTestUtils.intercept(IllegalArgumentException.class,
        () -> webHdfs.setQuotaByStorageType(path, StorageType.RAM_DISK, 100));
    LambdaTestUtils.intercept(IllegalArgumentException.class,
        () -> webHdfs.setQuotaByStorageType(path, StorageType.NVDIMM, -100));
  }


  @Test
  public void testWebHdfsPread() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
        .build();
    byte[] content = new byte[1024];
    RANDOM.nextBytes(content);
    final Path foo = new Path("/foo");
    FSDataInputStream in = null;
    try {
      final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
          WebHdfsConstants.WEBHDFS_SCHEME);
      try (OutputStream os = fs.create(foo)) {
        os.write(content);
      }

      // pread
      in = fs.open(foo, 1024);
      byte[] buf = new byte[1024];
      try {
        in.readFully(1020, buf, 0, 5);
        Assert.fail("EOF expected");
      } catch (EOFException ignored) {}

      // mix pread with stateful read
      int length = in.read(buf, 0, 512);
      in.readFully(100, new byte[1024], 0, 100);
      int preadLen = in.read(200, new byte[1024], 0, 200);
      Assert.assertTrue(preadLen > 0);
      IOUtils.readFully(in, buf, length, 1024 - length);
      Assert.assertArrayEquals(content, buf);
    } finally {
      if (in != null) {
        in.close();
      }
    }
  }

  @Test(timeout = 30000)
  public void testGetHomeDirectory() throws Exception {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster.Builder(conf).build();
    cluster.waitActive();
    DistributedFileSystem hdfs = cluster.getFileSystem();

    final URI uri = new URI(WebHdfsConstants.WEBHDFS_SCHEME + "://"
        + cluster.getHttpUri(0).replace("http://", ""));
    final Configuration confTemp = new Configuration();

    WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(uri,
        confTemp);

    assertEquals(hdfs.getHomeDirectory().toUri().getPath(),
        webhdfs.getHomeDirectory().toUri().getPath());

    webhdfs.close();

    webhdfs = createWebHDFSAsTestUser(confTemp, uri, "XXX");

    assertNotEquals(hdfs.getHomeDirectory().toUri().getPath(),
        webhdfs.getHomeDirectory().toUri().getPath());

    webhdfs.close();
  }

  @Test
  public void testWebHdfsGetBlockLocationsWithStorageType() throws Exception{
    final Configuration conf = WebHdfsTestUtil.createConf();
    final int OFFSET = 42;
    final int LENGTH = 512;
    final Path PATH = new Path("/foo");
    byte[] CONTENTS = new byte[1024];
    RANDOM.nextBytes(CONTENTS);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    try (OutputStream os = fs.create(PATH)) {
      os.write(CONTENTS);
    }
    BlockLocation[] locations = fs.getFileBlockLocations(PATH, OFFSET, LENGTH);
    for (BlockLocation location : locations) {
      StorageType[] storageTypes = location.getStorageTypes();
      Assert.assertTrue(storageTypes != null && storageTypes.length > 0
          && storageTypes[0] == StorageType.DISK);
    }
  }

  @Test
  public void testWebHdfsGetBlockLocations() throws Exception{
    final Configuration conf = WebHdfsTestUtil.createConf();
    final int offset = 42;
    final int length = 512;
    final Path path = new Path("/foo");
    byte[] contents = new byte[1024];
    RANDOM.nextBytes(contents);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    try (OutputStream os = fs.create(path)) {
      os.write(contents);
    }
    BlockLocation[] locations = fs.getFileBlockLocations(path, offset, length);

    // Query webhdfs REST API to get block locations
    InetSocketAddress addr = cluster.getNameNode().getHttpAddress();

    // Case 1
    // URL without length or offset parameters
    URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");

    String response1 = getResponse(url1, "GET");
    // Parse BlockLocation array from json output using object mapper
    BlockLocation[] locationArray1 = toBlockLocationArray(response1);

    // Verify the result from rest call is same as file system api
    verifyEquals(locations, locationArray1);

    // Case 2
    // URL contains length and offset parameters
    URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
            + "&length=" + length + "&offset=" + offset);

    String response2 = getResponse(url2, "GET");
    BlockLocation[] locationArray2 = toBlockLocationArray(response2);

    verifyEquals(locations, locationArray2);

    // Case 3
    // URL contains length parameter but without offset parameters
    URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
            + "&length=" + length);

    String response3 = getResponse(url3, "GET");
    BlockLocation[] locationArray3 = toBlockLocationArray(response3);

    verifyEquals(locations, locationArray3);

    // Case 4
    // URL contains offset parameter but without length parameter
    URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
            + "&offset=" + offset);

    String response4 = getResponse(url4, "GET");
    BlockLocation[] locationArray4 = toBlockLocationArray(response4);

    verifyEquals(locations, locationArray4);

    // Case 5
    // URL specifies offset exceeds the file length
    URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
            + "&offset=1200");

    String response5 = getResponse(url5, "GET");
    BlockLocation[] locationArray5 = toBlockLocationArray(response5);

    // Expected an empty array of BlockLocation
    verifyEquals(new BlockLocation[] {}, locationArray5);
  }

  private BlockLocation[] toBlockLocationArray(String json)
      throws IOException {
    ObjectMapper mapper = new ObjectMapper();
    MapType subType = mapper.getTypeFactory().constructMapType(
        Map.class,
        String.class,
        BlockLocation[].class);
    MapType rootType = mapper.getTypeFactory().constructMapType(
        Map.class,
        mapper.constructType(String.class),
        mapper.constructType(subType));

    Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
        .readValue(json, rootType);
    Map<String, BlockLocation[]> locationMap = jsonMap
        .get("BlockLocations");
    BlockLocation[] locationArray = locationMap.get(
        BlockLocation.class.getSimpleName());
    return locationArray;
  }

  private void verifyEquals(BlockLocation[] locations1,
      BlockLocation[] locations2) throws IOException {
    for(int i=0; i<locations1.length; i++) {
      BlockLocation location1 = locations1[i];
      BlockLocation location2 = locations2[i];
      Assert.assertEquals(location1.getLength(),
          location2.getLength());
      Assert.assertEquals(location1.getOffset(),
          location2.getOffset());
      Assert.assertArrayEquals(location1.getCachedHosts(),
          location2.getCachedHosts());
      Assert.assertArrayEquals(location1.getHosts(),
          location2.getHosts());
      Assert.assertArrayEquals(location1.getNames(),
          location2.getNames());
      Assert.assertArrayEquals(location1.getTopologyPaths(),
          location2.getTopologyPaths());
      Assert.assertArrayEquals(location1.getStorageTypes(),
          location2.getStorageTypes());
    }
  }

  private static String getResponse(URL url, String httpRequestType)
      throws IOException {
    HttpURLConnection conn = null;
    try {
      conn = (HttpURLConnection) url.openConnection();
      conn.setRequestMethod(httpRequestType);
      conn.setInstanceFollowRedirects(false);
      return IOUtils.toString(conn.getInputStream(),
          StandardCharsets.UTF_8);
    } finally {
      if(conn != null) {
        conn.disconnect();
      }
    }
  }

  private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
      final URI uri, final String userName) throws Exception {

    final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
        userName, new String[] {"supergroup" });

    return ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
      @Override
      public WebHdfsFileSystem run() throws IOException {
        WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(uri,
            conf);
        return webhdfs;
      }
    });
  }

  @Test(timeout=90000)
  public void testWebHdfsReadRetries() throws Exception {
    // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
    final Configuration conf = WebHdfsTestUtil.createConf();
    final Path dir = new Path("/testWebHdfsReadRetries");

    conf.setBoolean(HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, true);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512);
    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);

    final short numDatanodes = 1;
    cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(numDatanodes)
        .build();
    cluster.waitActive();
    final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    // create a file
    final long length = 1L << 20;
    final Path file1 = new Path(dir, "testFile");

    DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);

    // get file status and check that it was written properly.
    final FileStatus s1 = fs.getFileStatus(file1);
    assertEquals("Write failed for file " + file1, length, s1.getLen());

    // Ensure file can be read through WebHdfsInputStream
    FSDataInputStream in = fs.open(file1);
    assertTrue("Input stream is not an instance of class WebHdfsInputStream",
        in.getWrappedStream() instanceof WebHdfsInputStream);
    int count = 0;
    for (; in.read() != -1; count++)
      ;
    assertEquals("Read failed for file " + file1, s1.getLen(), count);
    assertEquals("Sghould not be able to read beyond end of file", in.read(),
        -1);
    in.close();
    try {
      in.read();
      fail("Read after close should have failed");
    } catch (IOException ioe) {
    }

    WebHdfsFileSystem wfs = (WebHdfsFileSystem) fs;
    // Read should not be retried if AccessControlException is encountered.
    String msg = "ReadRetries: Test Access Control Exception";
    testReadRetryExceptionHelper(wfs, file1, new AccessControlException(msg),
        msg, false, 1);

    // Retry policy should be invoked if IOExceptions are thrown.
    msg = "ReadRetries: Test SocketTimeoutException";
    testReadRetryExceptionHelper(wfs, file1, new SocketTimeoutException(msg),
        msg, true, 5);
    msg = "ReadRetries: Test SocketException";
    testReadRetryExceptionHelper(wfs, file1, new SocketException(msg), msg,
        true, 5);
    msg = "ReadRetries: Test EOFException";
    testReadRetryExceptionHelper(wfs, file1, new EOFException(msg), msg, true,
        5);
    msg = "ReadRetries: Test Generic IO Exception";
    testReadRetryExceptionHelper(wfs, file1, new IOException(msg), msg, true,
        5);

    // If InvalidToken exception occurs, WebHdfs only retries if the
    // delegation token was replaced. Do that twice, then verify by checking
    // the number of times it tried.
    WebHdfsFileSystem spyfs = spy(wfs);
    when(spyfs.replaceExpiredDelegationToken()).thenReturn(true, true, false);
    msg = "ReadRetries: Test Invalid Token Exception";
    testReadRetryExceptionHelper(spyfs, file1, new InvalidToken(msg), msg,
        false, 3);
  }

  public boolean attemptedRetry;
  private void testReadRetryExceptionHelper(WebHdfsFileSystem fs, Path fn,
      final IOException ex, String msg, boolean shouldAttemptRetry,
      int numTimesTried)
      throws Exception {
    // Ovverride WebHdfsInputStream#getInputStream so that it returns
    // an input stream that throws the specified exception when read
    // is called.
    FSDataInputStream in = fs.open(fn);
    in.read(); // Connection is made only when the first read() occurs.
    final WebHdfsInputStream webIn =
        (WebHdfsInputStream)(in.getWrappedStream());

    final InputStream spyInputStream =
        spy(webIn.getReadRunner().getInputStream());
    doThrow(ex).when(spyInputStream).read((byte[])any(), anyInt(), anyInt());
    final WebHdfsFileSystem.ReadRunner rr = spy(webIn.getReadRunner());
    doReturn(spyInputStream)
        .when(rr).initializeInputStream((HttpURLConnection) any());
    rr.setInputStream(spyInputStream);
    webIn.setReadRunner(rr);

    // Override filesystem's retry policy in order to verify that
    // WebHdfsInputStream is calling shouldRetry for the appropriate
    // exceptions.
    final RetryAction retryAction = new RetryAction(RetryDecision.RETRY);
    final RetryAction failAction = new RetryAction(RetryDecision.FAIL);
    RetryPolicy rp = new RetryPolicy() {
      @Override
      public RetryAction shouldRetry(Exception e, int retries, int failovers,
          boolean isIdempotentOrAtMostOnce) throws Exception {
        attemptedRetry = true;
        if (retries > 3) {
          return failAction;
        } else {
          return retryAction;
        }
      }
    };
    fs.setRetryPolicy(rp);

    // If the retry logic is exercised, attemptedRetry will be true. Some
    // exceptions should exercise the retry logic and others should not.
    // Either way, the value of attemptedRetry should match shouldAttemptRetry.
    attemptedRetry = false;
    try {
      webIn.read();
      fail(msg + ": Read should have thrown exception.");
    } catch (Exception e) {
      assertTrue(e.getMessage().contains(msg));
    }
    assertEquals(msg + ": Read should " + (shouldAttemptRetry ? "" : "not ")
                + "have called shouldRetry. ",
        attemptedRetry, shouldAttemptRetry);

    verify(rr, times(numTimesTried)).getResponse((HttpURLConnection) any());
    webIn.close();
    in.close();
  }

  private void checkResponseContainsLocation(URL url, String TYPE)
    throws JSONException, IOException {
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    conn.setRequestMethod(TYPE);
    conn.setInstanceFollowRedirects(false);
    String response =
        IOUtils.toString(conn.getInputStream(), StandardCharsets.UTF_8);
    LOG.info("Response was : " + response);
    Assert.assertEquals(
      "Response wasn't " + HttpURLConnection.HTTP_OK,
      HttpURLConnection.HTTP_OK, conn.getResponseCode());

    JSONObject responseJson = new JSONObject(response);
    Assert.assertTrue("Response didn't give us a location. " + response,
      responseJson.has("Location"));

    //Test that the DN allows CORS on Create
    if(TYPE.equals("CREATE")) {
      URL dnLocation = new URL(responseJson.getString("Location"));
      HttpURLConnection dnConn = (HttpURLConnection) dnLocation.openConnection();
      dnConn.setRequestMethod("OPTIONS");
      Assert.assertEquals("Datanode url : " + dnLocation + " didn't allow "
        + "CORS", HttpURLConnection.HTTP_OK, dnConn.getResponseCode());
    }
  }

  @Test
  /**
   * Test that when "&noredirect=true" is added to operations CREATE, APPEND,
   * OPEN, and GETFILECHECKSUM the response (which is usually a 307 temporary
   * redirect) is a 200 with JSON that contains the redirected location
   */
  public void testWebHdfsNoRedirect() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    LOG.info("Started cluster");
    InetSocketAddress addr = cluster.getNameNode().getHttpAddress();

    URL url = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirectCreate"
            + "?op=CREATE"
            + Param.toSortedString("&", new NoRedirectParam(true)));
    LOG.info("Sending create request " + url);
    checkResponseContainsLocation(url, "PUT");

    // Write a file that we can read
    final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    final String PATH = "/testWebHdfsNoRedirect";
    byte[] CONTENTS = new byte[1024];
    RANDOM.nextBytes(CONTENTS);
    try (OutputStream os = fs.create(new Path(PATH))) {
      os.write(CONTENTS);
    }
    url = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect" + "?op=OPEN"
            + Param.toSortedString("&", new NoRedirectParam(true)));
    LOG.info("Sending open request " + url);
    checkResponseContainsLocation(url, "GET");

    url = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect"
            + "?op=GETFILECHECKSUM"
            + Param.toSortedString("&", new NoRedirectParam(true)));
    LOG.info("Sending getfilechecksum request " + url);
    checkResponseContainsLocation(url, "GET");

    url = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect" + "?op=APPEND"
            + Param.toSortedString("&", new NoRedirectParam(true)));
    LOG.info("Sending append request " + url);
    checkResponseContainsLocation(url, "POST");
  }

  @Test
  public void testGetTrashRoot() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final String currentUser =
        UserGroupInformation.getCurrentUser().getShortUserName();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    Path trashPath = webFS.getTrashRoot(new Path("/"));
    Path expectedPath = new Path(FileSystem.USER_HOME_PREFIX,
        new Path(currentUser, FileSystem.TRASH_PREFIX));
    assertEquals(expectedPath.toUri().getPath(), trashPath.toUri().getPath());
  }

  @Test
  public void testGetSnapshotTrashRoot() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
    final String currentUser =
        UserGroupInformation.getCurrentUser().getShortUserName();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    Path ssDir1 = new Path("/ssDir1");
    assertTrue(webFS.mkdirs(ssDir1));

    Path trashPath = webFS.getTrashRoot(ssDir1);
    Path expectedPath = new Path(FileSystem.USER_HOME_PREFIX,
        new Path(currentUser, FileSystem.TRASH_PREFIX));
    assertEquals(expectedPath.toUri().getPath(), trashPath.toUri().getPath());
    // Enable snapshot
    webFS.allowSnapshot(ssDir1);
    Path trashPathAfter = webFS.getTrashRoot(ssDir1);
    Path expectedPathAfter = new Path(ssDir1,
        new Path(FileSystem.TRASH_PREFIX, currentUser));
    assertEquals(expectedPathAfter.toUri().getPath(),
        trashPathAfter.toUri().getPath());
    // Cleanup
    webFS.disallowSnapshot(ssDir1);
    webFS.delete(ssDir1, true);
  }

  @Test
  public void testGetEZTrashRoot() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    FileSystemTestHelper fsHelper = new FileSystemTestHelper();
    File testRootDir = new File(fsHelper.getTestRootDir()).getAbsoluteFile();
    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
        "jceks://file" + new Path(testRootDir.toString(), "test.jks").toUri());
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    cluster.waitActive();
    DistributedFileSystem dfs = cluster.getFileSystem();
    final WebHdfsFileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
        conf, WebHdfsConstants.WEBHDFS_SCHEME);
    HdfsAdmin dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
    dfs.getClient().setKeyProvider(
        cluster.getNameNode().getNamesystem().getProvider());
    final String testkey = "test_key";
    DFSTestUtil.createKey(testkey, cluster, conf);

    final Path zone1 = new Path("/zone1");
    dfs.mkdirs(zone1, new FsPermission(700));
    dfsAdmin.createEncryptionZone(zone1, testkey,
        EnumSet.of(CreateEncryptionZoneFlag.PROVISION_TRASH));

    final Path insideEZ = new Path(zone1, "insideEZ");
    dfs.mkdirs(insideEZ, new FsPermission(700));
    assertEquals(
        dfs.getTrashRoot(insideEZ).toUri().getPath(),
        webhdfs.getTrashRoot(insideEZ).toUri().getPath());

    final Path outsideEZ = new Path("/outsideEZ");
    dfs.mkdirs(outsideEZ, new FsPermission(755));
    assertEquals(
        dfs.getTrashRoot(outsideEZ).toUri().getPath(),
        webhdfs.getTrashRoot(outsideEZ).toUri().getPath());

    final Path root = new Path("/");
    assertEquals(
        dfs.getTrashRoot(root).toUri().getPath(),
        webhdfs.getTrashRoot(root).toUri().getPath());
    assertEquals(
        webhdfs.getTrashRoot(root).toUri().getPath(),
        webhdfs.getTrashRoot(zone1).toUri().getPath());
    assertEquals(
        webhdfs.getTrashRoot(outsideEZ).toUri().getPath(),
        webhdfs.getTrashRoot(zone1).toUri().getPath());
  }

  @Test
  public void testStoragePolicy() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final Path path = new Path("/file");
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    // test getAllStoragePolicies
    Assert.assertTrue(Arrays.equals(dfs.getAllStoragePolicies().toArray(),
        webHdfs.getAllStoragePolicies().toArray()));

    // test get/set/unset policies
    DFSTestUtil.createFile(dfs, path, 0, (short) 1, 0L);
    // get defaultPolicy
    BlockStoragePolicySpi defaultdfsPolicy = dfs.getStoragePolicy(path);
    // set policy through webhdfs
    webHdfs.setStoragePolicy(path, HdfsConstants.COLD_STORAGE_POLICY_NAME);
    // get policy from dfs
    BlockStoragePolicySpi dfsPolicy = dfs.getStoragePolicy(path);
    // get policy from webhdfs
    BlockStoragePolicySpi webHdfsPolicy = webHdfs.getStoragePolicy(path);
    Assert.assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME.toString(),
        webHdfsPolicy.getName());
    Assert.assertEquals(webHdfsPolicy, dfsPolicy);
    // unset policy
    webHdfs.unsetStoragePolicy(path);
    Assert.assertEquals(defaultdfsPolicy, webHdfs.getStoragePolicy(path));
  }

  @Test
  public void testSetStoragePolicyWhenPolicyDisabled() throws Exception {
    Configuration conf = new HdfsConfiguration();
    conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
        .build();
    try {
      cluster.waitActive();
      final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
          conf, WebHdfsConstants.WEBHDFS_SCHEME);
      webHdfs.setStoragePolicy(new Path("/"),
          HdfsConstants.COLD_STORAGE_POLICY_NAME);
      fail("Should throw exception, when storage policy disabled");
    } catch (IOException e) {
      Assert.assertTrue(e.getMessage().contains(
          "Failed to set storage policy since"));
    }
  }

  private void checkECPolicyState(Collection<ErasureCodingPolicyInfo> policies,
      String ecpolicy, String state) {
    Iterator<ErasureCodingPolicyInfo> itr = policies.iterator();
    boolean found = false;
    while (policies.iterator().hasNext()) {
      ErasureCodingPolicyInfo policy = itr.next();
      if (policy.getPolicy().getName().equals(ecpolicy)) {
        found = true;
        if (state.equals("disable")) {
          Assert.assertTrue(policy.isDisabled());
        } else if (state.equals("enable")) {
          Assert.assertTrue(policy.isEnabled());
        }
        break;
      }
    }
    Assert.assertTrue(found);
  }

  // Test For Enable/Disable EC Policy in DFS.
  @Test
  public void testECPolicyCommands() throws Exception {
    Configuration conf = new HdfsConfiguration();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    cluster.waitActive();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    String policy = "RS-10-4-1024k";
    // Check for Enable EC policy via WEBHDFS.
    dfs.disableErasureCodingPolicy(policy);
    checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable");
    webHdfs.enableECPolicy(policy);
    checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "enable");
    Path dir = new Path("/tmp");
    dfs.mkdirs(dir);
    // Check for Set EC policy via WEBHDFS
    assertNull(dfs.getErasureCodingPolicy(dir));
    webHdfs.setErasureCodingPolicy(dir, policy);
    assertEquals(policy, dfs.getErasureCodingPolicy(dir).getName());

    // Check for Get EC policy via WEBHDFS
    assertEquals(policy, webHdfs.getErasureCodingPolicy(dir).getName());

    // Check for Unset EC policy via WEBHDFS
    webHdfs.unsetErasureCodingPolicy(dir);
    assertNull(dfs.getErasureCodingPolicy(dir));

    // Check for Disable EC policy via WEBHDFS.
    webHdfs.disableECPolicy(policy);
    checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable");
  }

  // Test for Storage Policy Satisfier in DFS.

  @Test
  public void testWebHdfsSps() throws Exception {
    final Configuration conf = new HdfsConfiguration();
    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
        StoragePolicySatisfierMode.EXTERNAL.toString());
    StoragePolicySatisfier sps = new StoragePolicySatisfier(conf);
    try {
      cluster = new MiniDFSCluster.Builder(conf)
          .storageTypes(
              new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE } })
          .storagesPerDatanode(2).numDataNodes(1).build();
      cluster.waitActive();
      sps.init(new ExternalSPSContext(sps, DFSTestUtil.getNameNodeConnector(
          conf, HdfsServerConstants.MOVER_ID_PATH, 1, false)));
      sps.start(StoragePolicySatisfierMode.EXTERNAL);
      sps.start(StoragePolicySatisfierMode.EXTERNAL);
      DistributedFileSystem dfs = cluster.getFileSystem();
      DFSTestUtil.createFile(dfs, new Path("/file"), 1024L, (short) 1, 0L);
      DFSTestUtil.waitForReplication(dfs, new Path("/file"), (short) 1, 5000);
      dfs.setStoragePolicy(new Path("/file"), "COLD");
      dfs.satisfyStoragePolicy(new Path("/file"));
      DFSTestUtil.waitExpectedStorageType("/file", StorageType.ARCHIVE, 1,
          30000, dfs);
    } finally {
      sps.stopGracefully();
    }
  }

  @Test
  public void testWebHdfsAppend() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final int dnNumber = 3;
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNumber).build();

    final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    final DistributedFileSystem fs = cluster.getFileSystem();

    final Path appendFile = new Path("/testAppend.txt");
    final String content = "hello world";
    DFSTestUtil.writeFile(fs, appendFile, content);

    for (int index = 0; index < dnNumber - 1; index++) {
      cluster.shutdownDataNode(index);
    }
    cluster.restartNameNodes();
    cluster.waitActive();

    try {
      DFSTestUtil.appendFile(webFS, appendFile, content);
      fail("Should fail to append file since "
          + "datanode number is 1 and replication is 3");
    } catch (IOException ignored) {
      String resultContent = DFSTestUtil.readFile(fs, appendFile);
      assertTrue(resultContent.equals(content));
    }
  }

  /**
   * Test fsserver defaults response from {@link DistributedFileSystem} and
   * {@link WebHdfsFileSystem} are the same.
   * @throws Exception
   */
  @Test
  public void testFsserverDefaults() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    // Here we override all the default values so that we can verify that it
    // doesn't pick up the default value.
    long blockSize = 256*1024*1024;
    int bytesPerChecksum = 256;
    int writePacketSize = 128*1024;
    int replicationFactor = 0;
    int bufferSize = 1024;
    boolean encryptDataTransfer = true;
    long trashInterval = 1;
    String checksumType = "CRC32";
    // Setting policy to a special value 7 because BlockManager will
    // create defaultSuite with policy id 7.
    byte policyId = (byte) 7;

    conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
    conf.setInt(DFS_BYTES_PER_CHECKSUM_KEY, bytesPerChecksum);
    conf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, writePacketSize);
    conf.setInt(DFS_REPLICATION_KEY, replicationFactor);
    conf.setInt(IO_FILE_BUFFER_SIZE_KEY, bufferSize);
    conf.setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, encryptDataTransfer);
    conf.setLong(FS_TRASH_INTERVAL_KEY, trashInterval);
    conf.set(DFS_CHECKSUM_TYPE_KEY, checksumType);
    FsServerDefaults originalServerDefaults = new FsServerDefaults(blockSize,
        bytesPerChecksum, writePacketSize, (short)replicationFactor,
        bufferSize, encryptDataTransfer, trashInterval,
        DataChecksum.Type.valueOf(checksumType), "", policyId);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    FsServerDefaults dfsServerDefaults = dfs.getServerDefaults();
    FsServerDefaults webfsServerDefaults = webfs.getServerDefaults();
    // Verify whether server defaults value that we override is equal to
    // dfsServerDefaults.
    compareFsServerDefaults(originalServerDefaults, dfsServerDefaults);
    // Verify whether dfs serverdefaults is equal to
    // webhdfsServerDefaults.
    compareFsServerDefaults(dfsServerDefaults, webfsServerDefaults);
    webfs.getServerDefaults();
  }

  private void compareFsServerDefaults(FsServerDefaults serverDefaults1,
      FsServerDefaults serverDefaults2) throws Exception {
    Assert.assertEquals("Block size is different",
        serverDefaults1.getBlockSize(),
        serverDefaults2.getBlockSize());
    Assert.assertEquals("Bytes per checksum are different",
        serverDefaults1.getBytesPerChecksum(),
        serverDefaults2.getBytesPerChecksum());
    Assert.assertEquals("Write packet size is different",
        serverDefaults1.getWritePacketSize(),
        serverDefaults2.getWritePacketSize());
    Assert.assertEquals("Default replication is different",
        serverDefaults1.getReplication(),
        serverDefaults2.getReplication());
    Assert.assertEquals("File buffer size are different",
        serverDefaults1.getFileBufferSize(),
        serverDefaults2.getFileBufferSize());
    Assert.assertEquals("Encrypt data transfer key is different",
        serverDefaults1.getEncryptDataTransfer(),
        serverDefaults2.getEncryptDataTransfer());
    Assert.assertEquals("Trash interval is different",
        serverDefaults1.getTrashInterval(),
        serverDefaults2.getTrashInterval());
    Assert.assertEquals("Checksum type is different",
        serverDefaults1.getChecksumType(),
        serverDefaults2.getChecksumType());
    Assert.assertEquals("Key provider uri is different",
        serverDefaults1.getKeyProviderUri(),
        serverDefaults2.getKeyProviderUri());
    Assert.assertEquals("Default storage policy is different",
        serverDefaults1.getDefaultStoragePolicyId(),
        serverDefaults2.getDefaultStoragePolicyId());
  }

  /**
   * Tests the case when client is upgraded to return {@link FsServerDefaults}
   * but then namenode is not upgraded.
   * @throws Exception
   */
  @Test
  public void testFsserverDefaultsBackwardsCompatible() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
    final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);
    FSNamesystem fsnSpy =
        NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode());
    Mockito.when(fsnSpy.getServerDefaults())
        .thenThrow(new UnsupportedOperationException());
    try {
      webfs.getServerDefaults();
      Assert.fail("should have thrown UnSupportedOperationException.");
    } catch (UnsupportedOperationException uoe) {
      // Expected exception.
    }
  }

  /**
   * Tests that {@link WebHdfsFileSystem.AbstractRunner} propagates original
   * exception's stacktrace and cause during runWithRetry attempts.
   * @throws Exception
   */
  @Test
  public void testExceptionPropogationInAbstractRunner() throws Exception{
    final Configuration conf = WebHdfsTestUtil.createConf();
    final Path dir = new Path("/testExceptionPropogationInAbstractRunner");

    conf.setBoolean(HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, true);

    final short numDatanodes = 1;
    cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(numDatanodes)
        .build();
    cluster.waitActive();
    final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    // create a file
    final long length = 1L << 20;
    final Path file1 = new Path(dir, "testFile");

    DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);

    // get file status and check that it was written properly.
    final FileStatus s1 = fs.getFileStatus(file1);
    assertEquals("Write failed for file " + file1, length, s1.getLen());

    FSDataInputStream in = fs.open(file1);
    in.read(); // Connection is made only when the first read() occurs.
    final WebHdfsInputStream webIn =
        (WebHdfsInputStream) (in.getWrappedStream());

    final String msg = "Throwing dummy exception";
    IOException ioe = new IOException(msg, new DummyThrowable());

    WebHdfsFileSystem.ReadRunner readRunner = spy(webIn.getReadRunner());
    doThrow(ioe).when(readRunner).getResponse(any(HttpURLConnection.class));

    webIn.setReadRunner(readRunner);

    try {
      webIn.read();
      fail("Read should have thrown IOException.");
    } catch (IOException e) {
      assertTrue(e.getMessage().contains(msg));
      assertTrue(e.getCause() instanceof DummyThrowable);
    }
  }

  /**
   * Tests that the LISTSTATUS ang GETFILESTATUS WebHDFS calls return the
   * ecPolicy for EC files.
   */
  @Test(timeout=300000)
  public void testECPolicyInFileStatus() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies
        .getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
    final String ecPolicyName = ecPolicy.getName();
    cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(5)
        .build();
    cluster.waitActive();
    final DistributedFileSystem fs = cluster.getFileSystem();

    // Create an EC dir and write a test file in it
    final Path ecDir = new Path("/ec");
    Path ecFile = new Path(ecDir, "ec_file.txt");
    Path nonEcFile = new Path(ecDir, "non_ec_file.txt");
    fs.mkdirs(ecDir);

    // Create a non-EC file before enabling ec policy
    DFSTestUtil.createFile(fs, nonEcFile, 1024, (short) 1, 0);

    fs.enableErasureCodingPolicy(ecPolicyName);
    fs.setErasureCodingPolicy(ecDir, ecPolicyName);

    // Create a EC file
    DFSTestUtil.createFile(fs, ecFile, 1024, (short) 1, 0);

    // Query webhdfs REST API to list statuses of files/directories in ecDir
    InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
    URL listStatusUrl = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + ecDir.toString() + "?op=LISTSTATUS");

    HttpURLConnection conn = (HttpURLConnection) listStatusUrl.openConnection();
    conn.setRequestMethod("GET");
    conn.setInstanceFollowRedirects(false);
    String listStatusResponse = IOUtils.toString(conn.getInputStream(),
        StandardCharsets.UTF_8);
    Assert.assertEquals("Response wasn't " + HttpURLConnection.HTTP_OK,
        HttpURLConnection.HTTP_OK, conn.getResponseCode());

    // Verify that ecPolicy is set in the ListStatus response for ec file
    String ecpolicyForECfile = getECPolicyFromFileStatusJson(
        getFileStatusJson(listStatusResponse, ecFile.getName()));
    assertEquals("EC policy for ecFile should match the set EC policy",
        ecpolicyForECfile, ecPolicyName);

    // Verify that ecPolicy is not set in the ListStatus response for non-ec
    // file
    String ecPolicyForNonECfile = getECPolicyFromFileStatusJson(
        getFileStatusJson(listStatusResponse, nonEcFile.getName()));
    assertEquals("EC policy for nonEcFile should be null (not set)",
        ecPolicyForNonECfile, null);

    // Query webhdfs REST API to get fileStatus for ecFile
    URL getFileStatusUrl = new URL("http", addr.getHostString(), addr.getPort(),
        WebHdfsFileSystem.PATH_PREFIX + ecFile.toString() +
            "?op=GETFILESTATUS");

    conn = (HttpURLConnection) getFileStatusUrl.openConnection();
    conn.setRequestMethod("GET");
    conn.setInstanceFollowRedirects(false);
    String getFileStatusResponse = IOUtils.toString(conn.getInputStream(),
        StandardCharsets.UTF_8);
    Assert.assertEquals("Response wasn't " + HttpURLConnection.HTTP_OK,
        HttpURLConnection.HTTP_OK, conn.getResponseCode());

    // Verify that ecPolicy is set in getFileStatus response for ecFile
    JSONObject fileStatusObject = new JSONObject(getFileStatusResponse)
        .getJSONObject("FileStatus");
    ecpolicyForECfile = getECPolicyFromFileStatusJson(fileStatusObject);
    assertEquals("EC policy for ecFile should match the set EC policy",
        ecpolicyForECfile, ecPolicyName);
  }

  @Test
  public void testStatistics() throws Exception {
    final Configuration conf = new HdfsConfiguration();
    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
        StoragePolicySatisfierMode.EXTERNAL.toString());
    StoragePolicySatisfier sps = new StoragePolicySatisfier(conf);
    try {
      cluster = new MiniDFSCluster.Builder(conf).storageTypes(
          new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}})
          .storagesPerDatanode(2).numDataNodes(1).build();
      cluster.waitActive();
      sps.init(new ExternalSPSContext(sps, DFSTestUtil
          .getNameNodeConnector(conf, HdfsServerConstants.MOVER_ID_PATH, 1,
              false)));
      sps.start(StoragePolicySatisfierMode.EXTERNAL);
      sps.start(StoragePolicySatisfierMode.EXTERNAL);
      final WebHdfsFileSystem webHdfs = WebHdfsTestUtil
          .getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
      Path dir = new Path("/test");
      webHdfs.mkdirs(dir);
      int readOps = 0;
      int writeOps = 0;
      FileSystem.clearStatistics();

      long opCount =
          getOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICY);
      webHdfs.getStoragePolicy(dir);
      checkStatistics(webHdfs, ++readOps, writeOps, 0);
      checkOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICY,
          opCount + 1);

      opCount =
          getOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICIES);
      webHdfs.getAllStoragePolicies();
      checkStatistics(webHdfs, ++readOps, writeOps, 0);
      checkOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICIES,
          opCount + 1);

      opCount =
          getOpStatistics(DFSOpsCountStatistics.OpType.SATISFY_STORAGE_POLICY);
      webHdfs.satisfyStoragePolicy(dir);
      checkStatistics(webHdfs, readOps, ++writeOps, 0);
      checkOpStatistics(DFSOpsCountStatistics.OpType.SATISFY_STORAGE_POLICY,
          opCount + 1);

      opCount = getOpStatistics(
          DFSOpsCountStatistics.OpType.GET_SNAPSHOTTABLE_DIRECTORY_LIST);
      webHdfs.getSnapshottableDirectoryList();
      checkStatistics(webHdfs, ++readOps, writeOps, 0);
      checkOpStatistics(
          DFSOpsCountStatistics.OpType.GET_SNAPSHOTTABLE_DIRECTORY_LIST,
          opCount + 1);
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void testLinkTarget() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    try {
      cluster = new MiniDFSCluster.Builder(conf)
          .numDataNodes(3)
          .build();
      cluster.waitActive();

      final WebHdfsFileSystem webHdfs =
          WebHdfsTestUtil.getWebHdfsFileSystem(conf,
              WebHdfsConstants.WEBHDFS_SCHEME);

      // Symbolic link
      Path root = new Path("/webHdfsTest/");
      Path targetFile = new Path(root, "debug.log");
      FileSystemTestHelper.createFile(webHdfs, targetFile);

      Path symLink = new Path(root, "debug.link");

      webHdfs.createSymlink(targetFile, symLink, false);
      assertEquals(webHdfs.getLinkTarget(symLink), targetFile);
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void testFileLinkStatus() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    try {
      cluster = new MiniDFSCluster.Builder(conf).build();
      cluster.waitActive();

      final WebHdfsFileSystem webHdfs =
          WebHdfsTestUtil.getWebHdfsFileSystem(conf,
              WebHdfsConstants.WEBHDFS_SCHEME);
      // Symbolic link
      Path root = new Path("/webHdfsTest/");
      Path file = new Path(root, "file");
      FileSystemTestHelper.createFile(webHdfs, file);

      Path linkToFile = new Path(root, "linkToFile");

      webHdfs.createSymlink(file, linkToFile, false);
      assertFalse(webHdfs.getFileLinkStatus(file).isSymlink());
      assertTrue(webHdfs.getFileLinkStatus(linkToFile).isSymlink());
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void testFsStatus() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    try {
      cluster = new MiniDFSCluster.Builder(conf).build();
      cluster.waitActive();

      final WebHdfsFileSystem webHdfs =
          WebHdfsTestUtil.getWebHdfsFileSystem(conf,
              WebHdfsConstants.WEBHDFS_SCHEME);

      final DistributedFileSystem dfs = cluster.getFileSystem();

      final String path = "/foo";
      try (OutputStream os = webHdfs.create(new Path(path))) {
        os.write(new byte[1024]);
      }

      FsStatus webHdfsFsStatus = webHdfs.getStatus(new Path("/"));
      Assert.assertNotNull(webHdfsFsStatus);

      FsStatus dfsFsStatus = dfs.getStatus(new Path("/"));
      Assert.assertNotNull(dfsFsStatus);

      //Validate used free and capacity are the same as DistributedFileSystem
      Assert.assertEquals(webHdfsFsStatus.getUsed(), dfsFsStatus.getUsed());
      Assert.assertEquals(webHdfsFsStatus.getRemaining(),
          dfsFsStatus.getRemaining());
      Assert.assertEquals(webHdfsFsStatus.getCapacity(),
          dfsFsStatus.getCapacity());
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void testGetErasureCodingPolicies() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).build();
    try {
      cluster.waitActive();

      final WebHdfsFileSystem webHdfs =
          WebHdfsTestUtil.getWebHdfsFileSystem(conf,
              WebHdfsConstants.WEBHDFS_SCHEME);

      final DistributedFileSystem dfs = cluster.getFileSystem();

      Collection<ErasureCodingPolicyInfo> webHdfsEcPolicyInfos =
          webHdfs.getAllErasureCodingPolicies();

      Collection<ErasureCodingPolicyInfo> dfsEcPolicyInfos =
          dfs.getAllErasureCodingPolicies();

      //Validate erasureCodingPolicyInfos are the same as DistributedFileSystem
      assertEquals(dfsEcPolicyInfos.size(), webHdfsEcPolicyInfos.size());
      assertTrue(dfsEcPolicyInfos.containsAll(webHdfsEcPolicyInfos));
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void getAllErasureCodingCodecs() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).build();
    try {
      cluster.waitActive();

      final WebHdfsFileSystem webHdfs =
          WebHdfsTestUtil.getWebHdfsFileSystem(conf,
              WebHdfsConstants.WEBHDFS_SCHEME);

      final DistributedFileSystem dfs = cluster.getFileSystem();

      Map<String, String> webHdfsEcCodecs = webHdfs.getAllErasureCodingCodecs();

      Map<String, String> dfsEcCodecs = dfs.getAllErasureCodingCodecs();

      //Validate erasureCodingCodecs are the same as DistributedFileSystem
      assertEquals(webHdfsEcCodecs, dfsEcCodecs);
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void testGetTrashRoots() throws Exception {
    final Configuration conf = WebHdfsTestUtil.createConf();
    cluster = new MiniDFSCluster.Builder(conf).build();
    final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
        WebHdfsConstants.WEBHDFS_SCHEME);

    // Create user trash
    Path currUserHome = webFS.getHomeDirectory();
    Path currUserTrash = new Path(currUserHome, FileSystem.TRASH_PREFIX);
    webFS.mkdirs(currUserTrash);

    Collection<FileStatus> webTrashRoots = webFS.getTrashRoots(true);
    assertEquals(1, webTrashRoots.size());

    // Create trash root for user0
    UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user0");
    String user0HomeStr = DFSUtilClient.getHomeDirectory(conf, ugi);
    Path user0Trash = new Path(user0HomeStr, FileSystem.TRASH_PREFIX);
    webFS.mkdirs(user0Trash);

    webTrashRoots = webFS.getTrashRoots(true);
    assertEquals(2, webTrashRoots.size());

    webTrashRoots = webFS.getTrashRoots(false);
    assertEquals(1,  webTrashRoots.size());
  }

  /**
   * Get FileStatus JSONObject from ListStatus response.
   */
  private JSONObject getFileStatusJson(String response, String fileName)
      throws JSONException {
    JSONObject listStatusResponseJson = new JSONObject(response);
    JSONArray fileStatusArray = listStatusResponseJson
        .getJSONObject("FileStatuses")
        .getJSONArray("FileStatus");
    for (int i = 0; i < fileStatusArray.length(); i++) {
      JSONObject fileStatusJsonObject = fileStatusArray.getJSONObject(i);
      if (fileName.equals(fileStatusJsonObject.get("pathSuffix"))) {
        return fileStatusJsonObject;
      }
    }
    return null;
  }

  /**
   * Get ECPolicy name from FileStatus JSONObject.
   */
  private String getECPolicyFromFileStatusJson(JSONObject fileStatusJsonObject)
      throws JSONException {
    if (fileStatusJsonObject.has("ecPolicy")) {
      return fileStatusJsonObject.getString("ecPolicy");
    } else {
      return null;
    }
  }

  final static class DummyThrowable extends Throwable {
  }
}