TestSpaceReservation.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.util.Collection;
import java.util.EnumSet;
import java.util.function.Supplier;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.slf4j.event.Level;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;

import javax.management.MBeanServer;
import javax.management.ObjectName;

/**
 * Ensure that the DN reserves disk space equivalent to a full block for
 * replica being written (RBW) & Replica being copied from another DN.
 */
public class TestSpaceReservation {
  static final Logger LOG = LoggerFactory.getLogger(TestSpaceReservation.class);

  private static final int DU_REFRESH_INTERVAL_MSEC = 500;
  private static final int STORAGES_PER_DATANODE = 1;
  private static final int BLOCK_SIZE = 1024 * 1024;
  private static final int SMALL_BLOCK_SIZE = 1024;

  protected MiniDFSCluster cluster;
  private Configuration conf;
  private DistributedFileSystem fs = null;
  private DFSClient client = null;
  FsVolumeReference singletonVolumeRef = null;
  FsVolumeImpl singletonVolume = null;
  private DataNodeFaultInjector old = null;

  private static Random rand = new Random();

  @Before
  public void before() {
    conf = new HdfsConfiguration();
  }

  private void initConfig(int blockSize) {
    // Refresh disk usage information frequently.
    conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
    conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);

    // Disable the scanner
    conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
  }

  static {
    GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(DataNode.LOG, Level.TRACE);
  }

  /**
   *
   * @param blockSize
   * @param perVolumeCapacity limit the capacity of each volume to the given
   *                          value. If negative, then don't limit.
   * @throws IOException
   */
  private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
    initConfig(blockSize);

    cluster = new MiniDFSCluster
        .Builder(conf)
        .storagesPerDatanode(STORAGES_PER_DATANODE)
        .numDataNodes(numDatanodes)
        .build();
    fs = cluster.getFileSystem();
    client = fs.getClient();
    cluster.waitActive();

    if (perVolumeCapacity >= 0) {
      try (FsDatasetSpi.FsVolumeReferences volumes =
          cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
        singletonVolumeRef = volumes.get(0).obtainReference();
      }
      singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
      singletonVolume.setCapacityForTesting(perVolumeCapacity);
    }
  }

  @After
  public void shutdownCluster() throws IOException {
    if (singletonVolumeRef != null) {
      singletonVolumeRef.close();
      singletonVolumeRef = null;
    }

    if (client != null) {
      client.close();
      client = null;
    }

    if (fs != null) {
      fs.close();
      fs = null;
    }

    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }
    if (old != null) {
      DataNodeFaultInjector.set(old);
    }
  }

  private void createFileAndTestSpaceReservation(
      final String fileNamePrefix, final int fileBlockSize)
      throws IOException, InterruptedException {
    // Enough for 1 block + meta files + some delta.
    final long configuredCapacity = fileBlockSize * 2 - 1;
    startCluster(BLOCK_SIZE, 1, configuredCapacity);
    FSDataOutputStream out = null;
    Path path = new Path("/" + fileNamePrefix + ".dat");

    try {
      out = fs.create(path, false, 4096, (short) 1, fileBlockSize);

      byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
      out.write(buffer);
      out.hsync();
      int bytesWritten = buffer.length;

      // Check that space was reserved for a full block minus the bytesWritten.
      assertThat(singletonVolume.getReservedForReplicas(),
                 is((long) fileBlockSize - bytesWritten));
      out.close();
      out = null;

      // Check that the reserved space has been released since we closed the
      // file.
      assertThat(singletonVolume.getReservedForReplicas(), is(0L));

      // Reopen the file for appends and write 1 more byte.
      out = fs.append(path);
      out.write(buffer);
      out.hsync();
      bytesWritten += buffer.length;

      // Check that space was again reserved for a full block minus the
      // bytesWritten so far.
      assertThat(singletonVolume.getReservedForReplicas(),
                 is((long) fileBlockSize - bytesWritten));

      // Write once again and again verify the available space. This ensures
      // that the reserved space is progressively adjusted to account for bytes
      // written to disk.
      out.write(buffer);
      out.hsync();
      bytesWritten += buffer.length;
      assertThat(singletonVolume.getReservedForReplicas(),
                 is((long) fileBlockSize - bytesWritten));
    } finally {
      if (out != null) {
        out.close();
      }
    }
  }

  @Test (timeout=300000)
  public void testWithDefaultBlockSize()
      throws IOException, InterruptedException {
    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
  }

  @Test (timeout=300000)
  public void testWithNonDefaultBlockSize()
      throws IOException, InterruptedException {
    // Same test as previous one, but with a non-default block size.
    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
  }

  @Rule
  public ExpectedException thrown = ExpectedException.none();

  @Test (timeout=300000)
  public void testWithLimitedSpace() throws IOException {
    // Cluster with just enough space for a full block + meta.
    startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
    final String methodName = GenericTestUtils.getMethodName();
    Path file1 = new Path("/" + methodName + ".01.dat");
    Path file2 = new Path("/" + methodName + ".02.dat");

    // Create two files.
    FSDataOutputStream os1 = null, os2 = null;

    try {
      os1 = fs.create(file1);
      os2 = fs.create(file2);

      // Write one byte to the first file.
      byte[] data = new byte[1];
      os1.write(data);
      os1.hsync();

      // Try to write one byte to the second file.
      // The block allocation must fail.
      thrown.expect(RemoteException.class);
      os2.write(data);
      os2.hsync();
    } finally {
      if (os1 != null) {
        os1.close();
      }

      // os2.close() will fail as no block was allocated.
    }
  }

  /**
   * Ensure that reserved space is released when the client goes away
   * unexpectedly.
   *
   * The verification is done for each replica in the write pipeline.
   *
   * @throws IOException
   */
  @Test(timeout=300000)
  public void testSpaceReleasedOnUnexpectedEof()
      throws IOException, InterruptedException, TimeoutException {
    final short replication = 3;
    startCluster(BLOCK_SIZE, replication, -1);

    final String methodName = GenericTestUtils.getMethodName();
    final Path file = new Path("/" + methodName + ".01.dat");

    // Write 1 byte to the file and kill the writer.
    FSDataOutputStream os = fs.create(file, replication);
    os.write(new byte[1]);
    os.hsync();
    DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());

    // Ensure all space reserved for the replica was released on each
    // DataNode.
    for (DataNode dn : cluster.getDataNodes()) {
      try (FsDatasetSpi.FsVolumeReferences volumes =
          dn.getFSDataset().getFsVolumeReferences()) {
        final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
        GenericTestUtils.waitFor(new Supplier<Boolean>() {
          @Override
          public Boolean get() {
            return (volume.getReservedForReplicas() == 0);
          }
        }, 500, Integer.MAX_VALUE); // Wait until the test times out.
      }
    }
  }

  @SuppressWarnings("unchecked")
  @Test(timeout = 30000)
  public void testRBWFileCreationError() throws Exception {

    final short replication = 1;
    startCluster(BLOCK_SIZE, replication, -1);

    final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
        .get(0).getFSDataset().getFsVolumeReferences().get(0);
    final String methodName = GenericTestUtils.getMethodName();
    final Path file = new Path("/" + methodName + ".01.dat");

    // Mock BlockPoolSlice so that RBW file creation gives IOExcception
    BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
    Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any()))
        .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));

    Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
    field.setAccessible(true);
    Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
        .get(fsVolumeImpl);
    bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);

    try {
      // Write 1 byte to the file
      FSDataOutputStream os = fs.create(file, replication);
      os.write(new byte[1]);
      os.hsync();
      os.close();
      fail("Expecting IOException file creation failure");
    } catch (IOException e) {
      // Exception can be ignored (expected)
    }

    // Ensure RBW space reserved is released
    assertTrue(
        "Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(),
        fsVolumeImpl.getReservedForReplicas() == 0);

    // Reserve some bytes to verify double clearing space should't happen
    fsVolumeImpl.reserveSpaceForReplica(1000);
    try {
      // Write 1 byte to the file
      FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"),
          replication);
      os.write(new byte[1]);
      os.hsync();
      os.close();
      fail("Expecting IOException file creation failure");
    } catch (IOException e) {
      // Exception can be ignored (expected)
    }

    // Ensure RBW space reserved is released only once
    assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000);
  }

  @Test(timeout = 30000)
  public void testReservedSpaceInJMXBean() throws Exception {

    final short replication = 1;
    startCluster(BLOCK_SIZE, replication, -1);

    final String methodName = GenericTestUtils.getMethodName();
    final Path file = new Path("/" + methodName + ".01.dat");

    try (FSDataOutputStream os = fs.create(file, replication)) {
      // Write 1 byte to the file
      os.write(new byte[1]);
      os.hsync();

      final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      final ObjectName mxbeanName = new ObjectName(
          "Hadoop:service=DataNode,name=DataNodeInfo");
      final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
          "VolumeInfo");

      // verify reserved space for Replicas in JMX bean volume info
      assertTrue(volumeInfo.contains("reservedSpaceForReplicas"));
    }
  }

  @Test(timeout = 300000)
  public void testTmpSpaceReserve() throws Exception {

    final short replication = 2;
    startCluster(BLOCK_SIZE, replication, -1);
    final int byteCount1 = 100;
    final int byteCount2 = 200;

    final String methodName = GenericTestUtils.getMethodName();

    // Test positive scenario
    {
      final Path file = new Path("/" + methodName + ".01.dat");

      try (FSDataOutputStream os = fs.create(file, (short) 1)) {
        // Write test data to the file
        os.write(new byte[byteCount1]);
        os.hsync();
      }

      BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
      String firstReplicaNode = blockLocations[0].getNames()[0];

      int newReplicaDNIndex = 0;
      if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
          .getDisplayName())) {
        newReplicaDNIndex = 1;
      }

      FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
          .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);

      performReReplication(file, true);

      assertEquals("Wrong reserve space for Tmp ", byteCount1,
          fsVolumeImpl.getRecentReserved());

      assertEquals("Reserved Tmp space is not released", 0,
          fsVolumeImpl.getReservedForReplicas());
    }

    // Test when file creation fails
    {
      final Path file = new Path("/" + methodName + ".01.dat");

      try (FSDataOutputStream os = fs.create(file, (short) 1)) {
        // Write test data to the file
        os.write(new byte[byteCount2]);
        os.hsync();
      }

      BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
      String firstReplicaNode = blockLocations[0].getNames()[0];

      int newReplicaDNIndex = 0;
      if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
          .getDisplayName())) {
        newReplicaDNIndex = 1;
      }

      BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
      Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any()))
          .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));

      final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
          .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);

      // Reserve some bytes to verify double clearing space should't happen
      fsVolumeImpl.reserveSpaceForReplica(1000);

      Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
      field.setAccessible(true);
      @SuppressWarnings("unchecked")
      Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
          .get(fsVolumeImpl);
      bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);

      performReReplication(file, false);

      assertEquals("Wrong reserve space for Tmp ", byteCount2,
          fsVolumeImpl.getRecentReserved());

      assertEquals("Tmp space is not released OR released twice", 1000,
          fsVolumeImpl.getReservedForReplicas());
    }
  }

  private void performReReplication(Path filePath, boolean waitForSuccess)
      throws Exception {
    fs.setReplication(filePath, (short) 2);

    Thread.sleep(4000);
    BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10);

    if (waitForSuccess) {
      // Wait for the re replication
      while (blockLocations[0].getNames().length < 2) {
        Thread.sleep(2000);
        blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
      }
    }
  }

  /**
   * Stress test to ensure we are not leaking reserved space.
   * @throws IOException
   * @throws InterruptedException
   */
  @Test (timeout=600000)
  public void stressTest() throws IOException, InterruptedException {
    final int numWriters = 5;
    startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
    Writer[] writers = new Writer[numWriters];

    // Start a few writers and let them run for a while.
    for (int i = 0; i < numWriters; ++i) {
      writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
      writers[i].start();
    }

    Thread.sleep(60000);

    // Stop the writers.
    for (Writer w : writers) {
      w.stopWriter();
    }
    int filesCreated = 0;
    int numFailures = 0;
    for (Writer w : writers) {
      w.join();
      filesCreated += w.getFilesCreated();
      numFailures += w.getNumFailures();
    }

    LOG.info("Stress test created " + filesCreated +
             " files and hit " + numFailures + " failures");

    // Check no space was leaked.
    assertThat(singletonVolume.getReservedForReplicas(), is(0L));
  }

  private static class Writer extends Daemon {
    private volatile boolean keepRunning;
    private final DFSClient localClient;
    private int filesCreated = 0;
    private int numFailures = 0;
    byte[] data;

    Writer(DFSClient client, int blockSize) throws IOException {
      localClient = client;
      keepRunning = true;
      filesCreated = 0;
      numFailures = 0;

      // At least some of the files should span a block boundary.
      data = new byte[blockSize * 2];
    }

    @Override
    public void run() {
      /**
       * Create a file, write up to 3 blocks of data and close the file.
       * Do this in a loop until we are told to stop.
       */
      while (keepRunning) {
        OutputStream os = null;
        try {
          String filename = "/file-" + rand.nextLong();
          os = localClient.create(filename, false);
          os.write(data, 0, rand.nextInt(data.length));
          IOUtils.closeStream(os);
          os = null;
          localClient.delete(filename, false);
          Thread.sleep(50);     // Sleep for a bit to avoid killing the system.
          ++filesCreated;
        } catch (IOException ioe) {
          // Just ignore the exception and keep going.
          ++numFailures;
        } catch (InterruptedException ie) {
          return;
        } finally {
          if (os != null) {
            IOUtils.closeStream(os);
          }
        }
      }
    }

    public void stopWriter() {
      keepRunning = false;
    }

    public int getFilesCreated() {
      return filesCreated;
    }

    public int getNumFailures() {
      return numFailures;
    }
  }

  @Test(timeout = 30000)
  public void testReservedSpaceForAppend() throws Exception {
    final short replication = 3;
    startCluster(BLOCK_SIZE, replication, -1);
    final String methodName = GenericTestUtils.getMethodName();
    final Path file = new Path("/" + methodName + ".01.dat");

    // Write 1 byte to the file and kill the writer.
    FSDataOutputStream os = fs.create(file, replication);
    os.write(new byte[1024]);
    os.close();

    final Path file2 = new Path("/" + methodName + ".02.dat");

    // Write 1 byte to the file and keep it open.
    FSDataOutputStream os2 = fs.create(file2, replication);
    os2.write(new byte[1]);
    os2.hflush();
    int expectedFile2Reserved = BLOCK_SIZE - 1;
    checkReservedSpace(expectedFile2Reserved);

    // append one byte and verify reservedspace before and after closing
    os = fs.append(file);
    os.write(new byte[1]);
    os.hflush();
    int expectedFile1Reserved = BLOCK_SIZE - 1025;
    checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
    os.close();
    checkReservedSpace(expectedFile2Reserved);

    // append one byte and verify reservedspace before and after abort
    os = fs.append(file);
    os.write(new byte[1]);
    os.hflush();
    expectedFile1Reserved--;
    checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
    DFSTestUtil.abortStream(((DFSOutputStream) os.getWrappedStream()));
    checkReservedSpace(expectedFile2Reserved);
  }

  @Test(timeout = 30000)
  public void testReservedSpaceForPipelineRecovery() throws Exception {
    final short replication = 3;
    startCluster(BLOCK_SIZE, replication, -1);

    final String methodName = GenericTestUtils.getMethodName();
    final Path file = new Path("/" + methodName + ".01.dat");

    old = DataNodeFaultInjector.get();
    // Fault injector to fail connection to mirror first time.
    DataNodeFaultInjector.set(new DataNodeFaultInjector() {
      private int tries = 0;

      @Override
      public void failMirrorConnection() throws IOException {
        if (tries++ == 0) {
          throw new IOException("Failing Mirror for space reservation");
        }
      }
    });
    // Write 1 byte to the file and kill the writer.
    FSDataOutputStream os = fs.create(file, replication);
    os.write(new byte[1]);
    os.close();
    // Ensure all space reserved for the replica was released on each
    // DataNode.
    cluster.triggerBlockReports();
    for (final DataNode dn : cluster.getDataNodes()) {
      try (FsDatasetSpi.FsVolumeReferences volumes =
          dn.getFSDataset().getFsVolumeReferences()) {
        final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
        GenericTestUtils.waitFor(new Supplier<Boolean>() {
          @Override
          public Boolean get() {
            LOG.info("dn " + dn.getDisplayName() + " space : "
                + volume.getReservedForReplicas());
            return (volume.getReservedForReplicas() == 0);
          }
        }, 100, Integer.MAX_VALUE); // Wait until the test times out.
      }
    }
  }

  private void checkReservedSpace(final long expectedReserved) throws TimeoutException,
      InterruptedException, IOException {
    for (final DataNode dn : cluster.getDataNodes()) {
      try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset()
          .getFsVolumeReferences()) {
        final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
        GenericTestUtils.waitFor(new Supplier<Boolean>() {
          @Override
          public Boolean get() {
            LOG.info(
                "dn " + dn.getDisplayName() + " space : " + volume
                    .getReservedForReplicas() + ", Expected ReservedSpace :"
                    + expectedReserved);
            return (volume.getReservedForReplicas() == expectedReserved);
          }
        }, 100, 3000);
      }
    }
  }

  @Test(timeout = 60000)
  public void testReservedSpaceForLeaseRecovery() throws Exception {
    final short replication = 3;
    conf.setInt(
        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
    conf.setInt(
        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
        1000);
    startCluster(BLOCK_SIZE, replication, -1);

    final String methodName = GenericTestUtils.getMethodName();
    final Path file = new Path("/" + methodName + ".01.dat");
    // Write to the file and kill the writer.
    FSDataOutputStream os = fs.create(file, replication);
    os.write(new byte[8192]);
    os.hflush();
    os.close();
    /*
     * Reset the pipeline for the append in such a way that, datanode which is
     * down is one of the mirror, not the first datanode.
     */
    HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient()
        .getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0];
    LocatedBlock lastBlock = blockLocation.getLocatedBlock();
    // stop 3rd node.
    cluster.stopDataNode(lastBlock.getLocations()[2].getName());
    try {
      os = fs.append(file);
      DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(),
          lastBlock);
      os.writeBytes("hi");
      os.hsync();
    } catch (IOException e) {
      // Append will fail due to not able to replace datanodes in 3 nodes
      // cluster.
      LOG.info("", e);
    }
    DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
    /*
     * There is a chance that stopped DN could be chosen as primary for
     * recovery. If so, then recovery will not happen in time. So mark stopped
     * node as dead to exclude that node.
     */
    cluster.setDataNodeDead(lastBlock.getLocations()[2]);
    fs.recoverLease(file);
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        try {
          return fs.isFileClosed(file);
        } catch (IOException e) {
          return false;
        }
      }
    }, 500, 30000);
    checkReservedSpace(0);
  }

  /**
   * Ensure that bytes reserved of ReplicaInfo gets cleared
   * during finalize.
   *
   * @throws IOException
   */
  @Test(timeout = 300000)
  public void testReplicaInfoBytesReservedReleasedOnFinalize() throws IOException {
    short replication = 3;
    int bufferLength = 4096;
    startCluster(BLOCK_SIZE, replication, -1);

    String methodName = GenericTestUtils.getMethodName();
    Path path = new Path("/" + methodName + ".01.dat");

    FSDataOutputStream fos =
        fs.create(path, FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), bufferLength,
            replication, BLOCK_SIZE, null);
    // Allocate a block.
    fos.write(new byte[bufferLength]);
    fos.hsync();

    DataNode dataNode = cluster.getDataNodes().get(0);
    FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
    long expectedReservedSpace = BLOCK_SIZE - bufferLength;

    String bpid = cluster.getNamesystem().getBlockPoolId();
    Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(fsDataSetImpl, bpid);
    ReplicaInfo r = replicas.iterator().next();

    // Verify Initial Bytes Reserved for Replica and Volume are correct
    assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(),
        expectedReservedSpace);
    assertEquals(r.getBytesReserved(), expectedReservedSpace);

    // Verify Bytes Reserved for Replica and Volume are correct after finalize
    fsDataSetImpl.finalizeNewReplica(r, new ExtendedBlock(bpid, r));

    assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(), 0L);
    assertEquals(r.getBytesReserved(), 0L);

    fos.close();
  }
}