TestLeaseRecoveryStriped.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import org.apache.hadoop.util.Preconditions;
import java.util.function.Supplier;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.event.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;

public class TestLeaseRecoveryStriped {
  public static final Logger LOG = LoggerFactory
      .getLogger(TestLeaseRecoveryStriped.class);

  private final ErasureCodingPolicy ecPolicy =
      StripedFileTestUtil.getDefaultECPolicy();
  private final int dataBlocks = ecPolicy.getNumDataUnits();
  private final int parityBlocks = ecPolicy.getNumParityUnits();
  private final int cellSize = ecPolicy.getCellSize();
  private final int stripeSize = dataBlocks * cellSize;
  private final int stripesPerBlock = 4;
  private final int blockSize = cellSize * stripesPerBlock;
  private final int blockGroupSize = blockSize * dataBlocks;
  private static final int bytesPerChecksum = 512;

  static {
    GenericTestUtils.setLogLevel(DataNode.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(DFSStripedOutputStream.LOG, Level.DEBUG);
    GenericTestUtils.setLogLevel(BlockRecoveryWorker.LOG, Level.DEBUG);
    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.DEBUG);
  }

  static private final String fakeUsername = "fakeUser1";
  static private final String fakeGroup = "supergroup";

  private MiniDFSCluster cluster;
  private DistributedFileSystem dfs;
  private Configuration conf;
  private final Path dir = new Path("/" + this.getClass().getSimpleName());
  final Path p = new Path(dir, "testfile");
  private final int testFileLength = (stripesPerBlock - 1) * stripeSize;

  @Before
  public void setup() throws IOException {
    conf = new HdfsConfiguration();
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000L);
    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
    final int numDNs = dataBlocks + parityBlocks;
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
    cluster.waitActive();
    dfs = cluster.getFileSystem();
    dfs.enableErasureCodingPolicy(ecPolicy.getName());
    dfs.mkdirs(dir);
    dfs.setErasureCodingPolicy(dir, ecPolicy.getName());
  }

  @After
  public void tearDown() {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  private static class BlockLengths {
    private final int[] blockLengths;
    private final long safeLength;

    BlockLengths(ErasureCodingPolicy policy, int[] blockLengths) {
      this.blockLengths = blockLengths;
      long[] longArray = Arrays.stream(blockLengths).asLongStream().toArray();
      this.safeLength = StripedBlockUtil.getSafeLength(policy, longArray);
    }

    @Override
    public String toString() {
      return new ToStringBuilder(this)
          .append("blockLengths", getBlockLengths())
          .append("safeLength", getSafeLength())
          .toString();
    }

    /**
     * Length of each block in a block group.
     */
    public int[] getBlockLengths() {
      return blockLengths;
    }

    /**
     * Safe length, calculated by the block lengths.
     */
    public long getSafeLength() {
      return safeLength;
    }
  }

  private BlockLengths[] getBlockLengthsSuite() {
    final int groups = 4;
    final int minNumCell = 1;
    final int maxNumCell = stripesPerBlock;
    final int minNumDelta = -4;
    final int maxNumDelta = 2;
    BlockLengths[] suite = new BlockLengths[groups];
    Random random = ThreadLocalRandom.current();
    for (int i = 0; i < groups; i++) {
      int[] blockLengths = new int[dataBlocks + parityBlocks];
      for (int j = 0; j < blockLengths.length; j++) {
        // Choose a random number of cells for the block
        int numCell = random.nextInt(maxNumCell - minNumCell + 1) + minNumCell;
        // For data blocks, jitter the length a bit
        int numDelta = 0;
        if (i == groups - 1 && j < dataBlocks) {
          numDelta = random.nextInt(maxNumDelta - minNumDelta + 1) +
              minNumDelta;
        }
        blockLengths[j] = (cellSize * numCell) + (bytesPerChecksum * numDelta);
      }
      suite[i] = new BlockLengths(ecPolicy, blockLengths);
    }
    return suite;
  }

  private final BlockLengths[] blockLengthsSuite = getBlockLengthsSuite();

  @Test
  public void testLeaseRecovery() throws Exception {
    LOG.info("blockLengthsSuite: " +
        Arrays.toString(blockLengthsSuite));
    for (int i = 0; i < blockLengthsSuite.length; i++) {
      BlockLengths blockLengths = blockLengthsSuite[i];
      try {
        runTest(blockLengths.getBlockLengths(), blockLengths.getSafeLength());
      } catch (Throwable e) {
        String msg = "failed testCase at i=" + i + ", blockLengths="
            + blockLengths + "\n"
            + StringUtils.stringifyException(e);
        Assert.fail(msg);
      }
    }
  }

  /**
   * Test lease recovery for EC policy when one internal block located on
   * stale datanode.
   */
  @Test
  public void testLeaseRecoveryWithStaleDataNode() {
    LOG.info("blockLengthsSuite: " +
        Arrays.toString(blockLengthsSuite));
    long staleInterval = conf.getLong(
        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);

    for (int i = 0; i < blockLengthsSuite.length; i++) {
      BlockLengths blockLengths = blockLengthsSuite[i];
      try {
        writePartialBlocks(blockLengths.getBlockLengths());

        // Get block info for the last block and mark corresponding datanode
        // as stale.
        LocatedBlock locatedblock =
            TestInterDatanodeProtocol.getLastLocatedBlock(
                dfs.dfs.getNamenode(), p.toString());
        DatanodeInfo firstDataNode = locatedblock.getLocations()[0];
        DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
            .getBlockManager().getDatanodeManager()
            .getDatanode(firstDataNode);
        DataNodeTestUtils.setHeartbeatsDisabledForTests(
            cluster.getDataNode(dnDes.getIpcPort()), true);
        DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));

        long[] longArray = new long[blockLengths.getBlockLengths().length - 1];
        for (int j = 0; j < longArray.length; ++j) {
          longArray[j] = blockLengths.getBlockLengths()[j + 1];
        }
        int safeLength = (int) StripedBlockUtil.getSafeLength(ecPolicy,
            longArray);
        int checkDataLength = Math.min(testFileLength, safeLength);
        recoverLease();
        List<Long> oldGS = new ArrayList<>();
        oldGS.add(1001L);
        StripedFileTestUtil.checkData(dfs, p, checkDataLength,
            new ArrayList<>(), oldGS, blockGroupSize);

        DataNodeTestUtils.setHeartbeatsDisabledForTests(
            cluster.getDataNode(dnDes.getIpcPort()), false);
        DFSTestUtil.resetLastUpdatesWithOffset(dnDes, 0);

      } catch (Throwable e) {
        String msg = "failed testCase at i=" + i + ", blockLengths="
            + blockLengths + "\n"
            + StringUtils.stringifyException(e);
        Assert.fail(msg);
      }
    }
  }

  @Test
  public void testSafeLength() {
    checkSafeLength(0, 0); // Length of: 0
    checkSafeLength(1024 * 1024, 6291456L); // Length of: 1 MiB
    checkSafeLength(64 * 1024 * 1024, 402653184L); // Length of: 64 MiB
    checkSafeLength(189729792, 1132462080L); // Length of: 189729792
    checkSafeLength(256 * 1024 * 1024, 1610612736L); // Length of: 256 MiB
    checkSafeLength(517399040, 3101687808L); // Length of: 517399040
    checkSafeLength(1024 * 1024 * 1024, 6442450944L); // Length of: 1 GiB
  }

  /**
   * 1. Write 1MB data, then flush it.
   * 2. Mock client quiet exceptionally.
   * 3. Trigger lease recovery.
   * 4. Lease recovery successfully.
   */
  @Test
  public void testLeaseRecoveryWithManyZeroLengthReplica() {
    int curCellSize = (int)1024 * 1024;
    try {
      final FSDataOutputStream out = dfs.create(p);
      final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
          .getWrappedStream();
      for (int pos = 0; pos < curCellSize; pos++) {
        out.write(StripedFileTestUtil.getByte(pos));
      }
      for (int i = 0; i < dataBlocks + parityBlocks; i++) {
        StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
        waitStreamerAllAcked(s);
        stopBlockStream(s);
      }
      recoverLease();
      LOG.info("Trigger recover lease manually successfully.");
    } catch (Throwable e) {
      String msg = "failed testCase" + StringUtils.stringifyException(e);
      Assert.fail(msg);
    }
  }

  private void checkSafeLength(int blockLength, long expectedSafeLength) {
    int[] blockLengths = new int[]{blockLength, blockLength, blockLength, blockLength,
        blockLength, blockLength};
    long safeLength = new BlockLengths(ecPolicy, blockLengths).getSafeLength();
    Assert.assertEquals(expectedSafeLength, safeLength);
  }

  private void runTest(int[] blockLengths, long safeLength) throws Exception {
    writePartialBlocks(blockLengths);

    int checkDataLength = Math.min(testFileLength, (int)safeLength);

    recoverLease();

    List<Long> oldGS = new ArrayList<>();
    oldGS.add(1001L);
    StripedFileTestUtil.checkData(dfs, p, checkDataLength,
        new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize);
    // After recovery, storages are reported by primary DN. we should verify
    // storages reported by blockReport.
    cluster.restartNameNode(true);
    cluster.waitFirstBRCompleted(0, 10000);
    StripedFileTestUtil.checkData(dfs, p, checkDataLength,
        new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize);
  }

  /**
   * Write a file with blocks of different lengths.
   *
   * This method depends on completing before the DFS socket timeout.
   * Otherwise, the client will mark timed-out streamers as failed, and the
   * write will fail if there are too many failed streamers.
   *
   * @param blockLengths lengths of blocks to write
   * @throws Exception
   */
  private void writePartialBlocks(int[] blockLengths) throws Exception {
    final FSDataOutputStream out = dfs.create(p);
    final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
        .getWrappedStream();
    int[] posToKill = getPosToKill(blockLengths);
    int checkingPos = nextCheckingPos(posToKill, 0);
    Set<Integer> stoppedStreamerIndexes = new HashSet<>();
    try {
      for (int pos = 0; pos < testFileLength; pos++) {
        out.write(StripedFileTestUtil.getByte(pos));
        if (pos == checkingPos) {
          for (int index : getIndexToStop(posToKill, pos)) {
            out.flush();
            stripedOut.enqueueAllCurrentPackets();
            LOG.info("Stopping block stream idx {} at file offset {} block " +
                    "length {}", index, pos, blockLengths[index]);
            StripedDataStreamer s = stripedOut.getStripedDataStreamer(index);
            waitStreamerAllAcked(s);
            waitByteSent(s, blockLengths[index]);
            stopBlockStream(s);
            stoppedStreamerIndexes.add(index);
          }
          checkingPos = nextCheckingPos(posToKill, pos);
        }
      }
    } finally {
      // Flush everything
      out.flush();
      stripedOut.enqueueAllCurrentPackets();
      // Wait for streamers that weren't killed above to be written out
      for (int i=0; i< blockLengths.length; i++) {
        if (stoppedStreamerIndexes.contains(i)) {
          continue;
        }
        StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
        LOG.info("Waiting for block stream idx {} to reach length {}", i,
            blockLengths[i]);
        waitStreamerAllAcked(s);
      }
      DFSTestUtil.abortStream(stripedOut);
    }
  }

  private int nextCheckingPos(int[] posToKill, int curPos) {
    int checkingPos = Integer.MAX_VALUE;
    for (int i = 0; i < posToKill.length; i++) {
      if (posToKill[i] > curPos) {
        checkingPos = Math.min(checkingPos, posToKill[i]);
      }
    }
    return checkingPos;
  }

  private int[] getPosToKill(int[] blockLengths) {
    int[] posToKill = new int[dataBlocks + parityBlocks];
    for (int i = 0; i < dataBlocks; i++) {
      int numStripe = (blockLengths[i] - 1) / cellSize;
      posToKill[i] = numStripe * stripeSize + i * cellSize
          + blockLengths[i] % cellSize;
      if (blockLengths[i] % cellSize == 0) {
        posToKill[i] += cellSize;
      }
    }
    for (int i = dataBlocks; i < dataBlocks
        + parityBlocks; i++) {
      Preconditions.checkArgument(blockLengths[i] % cellSize == 0);
      int numStripe = (blockLengths[i]) / cellSize;
      posToKill[i] = numStripe * stripeSize;
    }
    return posToKill;
  }

  private List<Integer> getIndexToStop(int[] posToKill, int pos) {
    List<Integer> indices = new LinkedList<>();
    for (int i = 0; i < posToKill.length; i++) {
      if (pos == posToKill[i]) {
        indices.add(i);
      }
    }
    return indices;
  }

  private void waitByteSent(final StripedDataStreamer s, final long byteSent)
      throws Exception {
    try {
      GenericTestUtils.waitFor(new Supplier<Boolean>() {
        @Override
        public Boolean get() {
          return s.bytesSent >= byteSent;
        }
      }, 100, 30000);
    } catch (TimeoutException e) {
      throw new IOException("Timeout waiting for streamer " + s + ". Sent="
          + s.bytesSent + ", expected=" + byteSent);
    }
  }

  /**
   * Stop the block stream without immediately inducing a hard failure.
   * Packets can continue to be queued until the streamer hits a socket timeout.
   *
   * @param s
   * @throws Exception
   */
  private void stopBlockStream(StripedDataStreamer s) throws Exception {
    IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
    Whitebox.setInternalState(s, "blockStream",
        new DataOutputStream(nullOutputStream));
  }

  private void recoverLease() throws Exception {
    final DistributedFileSystem dfs2 =
        (DistributedFileSystem) getFSAsAnotherUser(conf);
    try {
      GenericTestUtils.waitFor(new Supplier<Boolean>() {
        @Override
        public Boolean get() {
          try {
            return dfs2.recoverLease(p);
          } catch (IOException e) {
            return false;
          }
        }
      }, 5000, 24000);
    } catch (TimeoutException e) {
      throw new IOException("Timeout waiting for recoverLease()");
    }
  }

  private FileSystem getFSAsAnotherUser(final Configuration c)
      throws IOException, InterruptedException {
    return FileSystem.get(FileSystem.getDefaultUri(c), c,
        UserGroupInformation
            .createUserForTesting(fakeUsername, new String[] { fakeGroup })
            .getUserName());
  }

  public static void waitStreamerAllAcked(DataStreamer s) throws IOException {
    long toWaitFor = s.getLastQueuedSeqno();
    s.waitForAckedSeqno(toWaitFor);
  }
}