TestDiskspaceQuotaUpdate.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestDiskspaceQuotaUpdate {
  private static final int BLOCKSIZE = 1024;
  private static final short REPLICATION = 4;
  static final long seed = 0L;
  private static final Path BASE_DIR = new Path("/TestQuotaUpdate");

  private static Configuration conf;
  private static MiniDFSCluster cluster;

  @BeforeClass
  public static void setUp() throws Exception {
    conf = new Configuration();
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
        .build();
    cluster.waitActive();
  }

  @Before
  public void resetCluster() throws Exception {
    if (!cluster.isClusterUp()) {
      // Previous test seems to have left cluster in a bad state;
      // recreate the cluster to protect subsequent tests
      cluster.shutdown();
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
        .build();
      cluster.waitActive();
    }
  }

  @AfterClass
  public static void tearDown() throws Exception {
    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }
  }

  private Path getParent(String testName) {
    return new Path(BASE_DIR, testName);
  }

  private FSDirectory getFSDirectory() {
    return cluster.getNamesystem().getFSDirectory();
  }

  private DistributedFileSystem getDFS() throws IOException {
    return cluster.getFileSystem();
  }

  /**
   * Test if the quota can be correctly updated for create file
   */
  @Test (timeout=60000)
  public void testQuotaUpdateWithFileCreate() throws Exception  {
    final Path foo =
        new Path(getParent(GenericTestUtils.getMethodName()), "foo");
    Path createdFile = new Path(foo, "created_file.data");
    getDFS().mkdirs(foo);
    getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
    long fileLen = BLOCKSIZE * 2 + BLOCKSIZE / 2;
    DFSTestUtil.createFile(getDFS(), createdFile, BLOCKSIZE / 16,
        fileLen, BLOCKSIZE, REPLICATION, seed);
    INode fnode = getFSDirectory().getINode4Write(foo.toString());
    assertTrue(fnode.isDirectory());
    assertTrue(fnode.isQuotaSet());
    QuotaCounts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature()
        .getSpaceConsumed();
    assertEquals(2, cnt.getNameSpace());
    assertEquals(fileLen * REPLICATION, cnt.getStorageSpace());
  }

  /**
   * Test if the quota can be correctly updated for append
   */
  @Test (timeout=60000)
  public void testUpdateQuotaForAppend() throws Exception {
    final Path foo =
        new Path(getParent(GenericTestUtils.getMethodName()), "foo");
    final Path bar = new Path(foo, "bar");
    long currentFileLen = BLOCKSIZE;
    DFSTestUtil.createFile(getDFS(), bar, currentFileLen, REPLICATION, seed);
    getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);

    // append half of the block data, the previous file length is at block
    // boundary
    DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE / 2);
    currentFileLen += (BLOCKSIZE / 2);

    INodeDirectory fooNode =
        getFSDirectory().getINode4Write(foo.toString()).asDirectory();
    assertTrue(fooNode.isQuotaSet());
    QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed();
    long ns = quota.getNameSpace();
    long ds = quota.getStorageSpace();
    assertEquals(2, ns); // foo and bar
    assertEquals(currentFileLen * REPLICATION, ds);
    ContentSummary c = getDFS().getContentSummary(foo);
    assertEquals(c.getSpaceConsumed(), ds);

    // append another block, the previous file length is not at block boundary
    DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE);
    currentFileLen += BLOCKSIZE;

    quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
    ns = quota.getNameSpace();
    ds = quota.getStorageSpace();
    assertEquals(2, ns); // foo and bar
    assertEquals(currentFileLen * REPLICATION, ds);
    c = getDFS().getContentSummary(foo);
    assertEquals(c.getSpaceConsumed(), ds);

    // append several blocks
    DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE * 3 + BLOCKSIZE / 8);
    currentFileLen += (BLOCKSIZE * 3 + BLOCKSIZE / 8);

    quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
    ns = quota.getNameSpace();
    ds = quota.getStorageSpace();
    assertEquals(2, ns); // foo and bar
    assertEquals(currentFileLen * REPLICATION, ds);
    c = getDFS().getContentSummary(foo);
    assertEquals(c.getSpaceConsumed(), ds);
  }

  /**
   * Test if the quota can be correctly updated when file length is updated
   * through fsync
   */
  @Test (timeout=60000)
  public void testUpdateQuotaForFSync() throws Exception {
    final Path foo =
        new Path(getParent(GenericTestUtils.getMethodName()), "foo");
    final Path bar = new Path(foo, "bar");
    DFSTestUtil.createFile(getDFS(), bar, BLOCKSIZE, REPLICATION, 0L);
    getDFS().setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);

    FSDataOutputStream out = getDFS().append(bar);
    out.write(new byte[BLOCKSIZE / 4]);
    ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));

    INodeDirectory fooNode =
        getFSDirectory().getINode4Write(foo.toString()).asDirectory();
    QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed();
    long ns = quota.getNameSpace();
    long ds = quota.getStorageSpace();
    assertEquals(2, ns); // foo and bar
    assertEquals(BLOCKSIZE * 2 * REPLICATION, ds); // file is under construction

    out.write(new byte[BLOCKSIZE / 4]);
    out.close();

    fooNode = getFSDirectory().getINode4Write(foo.toString()).asDirectory();
    quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
    ns = quota.getNameSpace();
    ds = quota.getStorageSpace();
    assertEquals(2, ns);
    assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds);

    // append another block
    DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE);

    quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
    ns = quota.getNameSpace();
    ds = quota.getStorageSpace();
    assertEquals(2, ns); // foo and bar
    assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds);
  }

  /**
   * Test append over storage quota does not mark file as UC or create lease
   */
  @Test (timeout=60000)
  public void testAppendOverStorageQuota() throws Exception {
    final Path dir = getParent(GenericTestUtils.getMethodName());
    final Path file = new Path(dir, "file");

    // create partial block file
    getDFS().mkdirs(dir);
    DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);

    // lower quota to cause exception when appending to partial block
    getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1);
    final INodeDirectory dirNode =
        getFSDirectory().getINode4Write(dir.toString()).asDirectory();
    final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed().getStorageSpace();
    try {
      DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE);
      Assert.fail("append didn't fail");
    } catch (DSQuotaExceededException e) {
      // ignore
    }

    LeaseManager lm = cluster.getNamesystem().getLeaseManager();
    // check that the file exists, isn't UC, and has no dangling lease
    INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
    Assert.assertNotNull(inode);
    Assert.assertFalse("should not be UC", inode.isUnderConstruction());
    Assert.assertNull("should not have a lease", lm.getLease(inode));
    // make sure the quota usage is unchanged
    final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed().getStorageSpace();
    assertEquals(spaceUsed, newSpaceUsed);
    // make sure edits aren't corrupted
    getDFS().recoverLease(file);
    cluster.restartNameNode(true);
  }

  /**
   * Test append over a specific type of storage quota does not mark file as
   * UC or create a lease
   */
  @Test (timeout=60000)
  public void testAppendOverTypeQuota() throws Exception {
    final Path dir = getParent(GenericTestUtils.getMethodName());
    final Path file = new Path(dir, "file");

    // create partial block file
    getDFS().mkdirs(dir);
    // set the storage policy on dir
    getDFS().setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
    DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);

    // set quota of SSD to 1L
    getDFS().setQuotaByStorageType(dir, StorageType.SSD, 1L);
    final INodeDirectory dirNode =
        getFSDirectory().getINode4Write(dir.toString()).asDirectory();
    final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed().getStorageSpace();
    try {
      DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE);
      Assert.fail("append didn't fail");
    } catch (QuotaByStorageTypeExceededException e) {
      //ignore
    }

    // check that the file exists, isn't UC, and has no dangling lease
    LeaseManager lm = cluster.getNamesystem().getLeaseManager();
    INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
    Assert.assertNotNull(inode);
    Assert.assertFalse("should not be UC", inode.isUnderConstruction());
    Assert.assertNull("should not have a lease", lm.getLease(inode));
    // make sure the quota usage is unchanged
    final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed().getStorageSpace();
    assertEquals(spaceUsed, newSpaceUsed);
    // make sure edits aren't corrupted
    getDFS().recoverLease(file);
    cluster.restartNameNode(true);
  }

  /**
   * Test truncate over quota does not mark file as UC or create a lease
   */
  @Test (timeout=60000)
  public void testTruncateOverQuota() throws Exception {
    final Path dir = getParent(GenericTestUtils.getMethodName());
    final Path file = new Path(dir, "file");

    // create partial block file
    getDFS().mkdirs(dir);
    DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);

    // lower quota to cause exception when appending to partial block
    getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1);
    final INodeDirectory dirNode =
        getFSDirectory().getINode4Write(dir.toString()).asDirectory();
    final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed().getStorageSpace();
    try {
      getDFS().truncate(file, BLOCKSIZE / 2 - 1);
      Assert.fail("truncate didn't fail");
    } catch (RemoteException e) {
      assertTrue(e.getClassName().contains("DSQuotaExceededException"));
    }

    // check that the file exists, isn't UC, and has no dangling lease
    LeaseManager lm = cluster.getNamesystem().getLeaseManager();
    INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
    Assert.assertNotNull(inode);
    Assert.assertFalse("should not be UC", inode.isUnderConstruction());
    Assert.assertNull("should not have a lease", lm.getLease(inode));
    // make sure the quota usage is unchanged
    final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
        .getSpaceConsumed().getStorageSpace();
    assertEquals(spaceUsed, newSpaceUsed);
    // make sure edits aren't corrupted
    getDFS().recoverLease(file);
    cluster.restartNameNode(true);
  }

  /**
   * Check whether the quota is initialized correctly.
   */
  @Test
  public void testQuotaInitialization() throws Exception {
    final int size = 500;
    Path testDir =
        new Path(getParent(GenericTestUtils.getMethodName()), "testDir");
    long expectedSize = 3 * BLOCKSIZE + BLOCKSIZE/2;
    getDFS().mkdirs(testDir);
    getDFS().setQuota(testDir, size*4, expectedSize*size*2);

    Path[] testDirs = new Path[size];
    for (int i = 0; i < size; i++) {
      testDirs[i] = new Path(testDir, "sub" + i);
      getDFS().mkdirs(testDirs[i]);
      getDFS().setQuota(testDirs[i], 100, 1000000);
      DFSTestUtil.createFile(getDFS(), new Path(testDirs[i], "a"), expectedSize,
          (short)1, 1L);
    }

    // Directly access the name system to obtain the current cached usage.
    INodeDirectory root = getFSDirectory().getRoot();
    HashMap<String, Long> nsMap = new HashMap<String, Long>();
    HashMap<String, Long> dsMap = new HashMap<String, Long>();
    scanDirsWithQuota(root, nsMap, dsMap, false);

    updateCountForQuota(1);
    scanDirsWithQuota(root, nsMap, dsMap, true);

    updateCountForQuota(2);
    scanDirsWithQuota(root, nsMap, dsMap, true);

    updateCountForQuota(4);
    scanDirsWithQuota(root, nsMap, dsMap, true);
  }

  private void updateCountForQuota(int i) {
    FSNamesystem fsn = cluster.getNamesystem();
    fsn.writeLock(RwLockMode.FS);
    try {
      getFSDirectory().updateCountForQuota(i);
    } finally {
      fsn.writeUnlock(RwLockMode.FS, "updateCountForQuota");
    }
  }

  private void scanDirsWithQuota(INodeDirectory dir,
      HashMap<String, Long> nsMap,
      HashMap<String, Long> dsMap, boolean verify) {
    if (dir.isQuotaSet()) {
      // get the current consumption
      QuotaCounts q = dir.getDirectoryWithQuotaFeature().getSpaceConsumed();
      String name = dir.getFullPathName();
      if (verify) {
        assertEquals(nsMap.get(name).longValue(), q.getNameSpace());
        assertEquals(dsMap.get(name).longValue(), q.getStorageSpace());
      } else {
        nsMap.put(name, Long.valueOf(q.getNameSpace()));
        dsMap.put(name, Long.valueOf(q.getStorageSpace()));
      }
    }

    for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
      if (child instanceof INodeDirectory) {
        scanDirsWithQuota((INodeDirectory)child, nsMap, dsMap, verify);
      }
    }
  }

  /**
   * Test that the cached quota stays correct between the COMMIT
   * and COMPLETE block steps, even if the replication factor is
   * changed during this time.
   */
  @Test (timeout=60000)
  public void testQuotaIssuesWhileCommitting() throws Exception {
    // We want a one-DN cluster so that we can force a lack of
    // commit by only instrumenting a single DN; we kill the other 3
    List<MiniDFSCluster.DataNodeProperties> dnprops = new ArrayList<>();
    try {
      for (int i = REPLICATION - 1; i > 0; i--) {
        dnprops.add(cluster.stopDataNode(i));
      }

      DatanodeProtocolClientSideTranslatorPB nnSpy =
          InternalDataNodeTestUtils.spyOnBposToNN(
              cluster.getDataNodes().get(0), cluster.getNameNode());

      testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 4);
      testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 4, (short) 1);

      // Don't actually change replication; just check that the sizes
      // agree during the commit period
      testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 1);
    } finally {
      for (MiniDFSCluster.DataNodeProperties dnprop : dnprops) {
        cluster.restartDataNode(dnprop);
      }
      cluster.waitActive();
    }
  }

  private void testQuotaIssuesWhileCommittingHelper(
      DatanodeProtocolClientSideTranslatorPB nnSpy,
      final short initialReplication, final short finalReplication)
      throws Exception {
    final String logStmt =
        "BUG: Inconsistent storagespace for directory";
    final Path dir = new Path(getParent(GenericTestUtils.getMethodName()),
        String.format("%d-%d", initialReplication, finalReplication));
    final Path file = new Path(dir, "testfile");

    LogCapturer logs = LogCapturer.captureLogs(NameNode.LOG);

    Mockito.doAnswer(new Answer<Object>() {
      @Override
      public Object answer(InvocationOnMock invocation) throws Throwable {
        if (finalReplication != initialReplication) {
          getDFS().setReplication(file, finalReplication);
        }
        // Call getContentSummary before the DN can notify the NN
        // that the block has been received to check for discrepancy
        getDFS().getContentSummary(dir);
        invocation.callRealMethod();
        return null;
      }
      }).when(nnSpy).blockReceivedAndDeleted(any(), anyString(), any());

    getDFS().mkdirs(dir);
    getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);

    DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, initialReplication, 1L);

    // Also check for discrepancy after completing the file
    getDFS().getContentSummary(dir);
    assertFalse(logs.getOutput().contains(logStmt));
  }

  /**
   * Test that the cached quota remains correct when the block has been
   * written to but not yet committed, even if the replication factor
   * is updated during this time.
   */
  private void testQuotaIssuesBeforeCommitting(short initialReplication,
      short finalReplication) throws Exception {
    final String logStmt =
        "BUG: Inconsistent storagespace for directory";
    final Path dir = new Path(getParent(GenericTestUtils.getMethodName()),
        String.format("%d-%d", initialReplication, finalReplication));
    final Path file = new Path(dir, "testfile");

    LogCapturer logs = LogCapturer.captureLogs(NameNode.LOG);

    getDFS().mkdirs(dir);
    getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);

    FSDataOutputStream out =
        TestFileCreation.createFile(getDFS(), file, initialReplication);
    TestFileCreation.writeFile(out, BLOCKSIZE / 2);
    out.hflush();

    getDFS().getContentSummary(dir);
    if (finalReplication != initialReplication) {
      // While the block is visible to the NN but has not yet been committed,
      // change the replication
      getDFS().setReplication(file, finalReplication);
    }

    out.close();

    getDFS().getContentSummary(dir);
    assertFalse(logs.getOutput().contains(logStmt));
  }

  @Test (timeout=60000)
  public void testCachedComputedSizesAgreeBeforeCommitting() throws Exception {
    // Don't actually change replication; just check that the sizes
    // agree before the commit period
    testQuotaIssuesBeforeCommitting((short)1, (short)1);
  }

  @Test (timeout=60000)
  public void testDecreaseReplicationBeforeCommitting() throws Exception {
    testQuotaIssuesBeforeCommitting((short)4, (short)1);
  }

  @Test (timeout=60000)
  public void testIncreaseReplicationBeforeCommitting() throws Exception {
    testQuotaIssuesBeforeCommitting((short)1, (short)4);
  }
}