TestDFSStripedOutputStreamWithFailureBase.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.junit.Assert;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
 * Base class for test striped file write operation.
 */
public class TestDFSStripedOutputStreamWithFailureBase {
  public static final Logger LOG = LoggerFactory.getLogger(
      TestDFSStripedOutputStreamWithFailureBase.class);
  static {
    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(
        LoggerFactory.getLogger(BlockPlacementPolicy.class), Level.TRACE);
  }

  protected final int cellSize = 64 * 1024; // 8k
  protected final int stripesPerBlock = 4;
  protected ErasureCodingPolicy ecPolicy;
  protected int dataBlocks;
  protected int parityBlocks;
  protected int blockSize;
  protected int blockGroupSize;
  private int[][] dnIndexSuite;
  protected List<Integer> lengths;
  protected static final Random RANDOM = new Random();
  MiniDFSCluster cluster;
  DistributedFileSystem dfs;
  final Path dir = new Path("/"
      + TestDFSStripedOutputStreamWithFailureBase.class.getSimpleName());
  protected static final int FLUSH_POS =
      9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;

  public ECSchema getEcSchema() {
    return StripedFileTestUtil.getDefaultECPolicy().getSchema();
  }

  /*
   * Initialize erasure coding policy.
   */
  @Before
  public void init() {
    ecPolicy = new ErasureCodingPolicy(getEcSchema(), cellSize);
    dataBlocks = ecPolicy.getNumDataUnits();
    parityBlocks = ecPolicy.getNumParityUnits();
    blockSize = cellSize * stripesPerBlock;
    blockGroupSize = blockSize * dataBlocks;
    dnIndexSuite = getDnIndexSuite();
    lengths = newLengths();
  }

  List<Integer> newLengths() {
    final List<Integer> lens = new ArrayList<>();
    lens.add(FLUSH_POS + 2);
    for(int b = 0; b <= 2; b++) {
      for(int c = 0; c < stripesPerBlock * dataBlocks; c++) {
        for(int delta = -1; delta <= 1; delta++) {
          final int length = b * blockGroupSize + c * cellSize + delta;
          System.out.println(lens.size() + ": length=" + length
              + ", (b, c, d) = (" + b + ", " + c + ", " + delta + ")");
          lens.add(length);
        }
      }
    }
    return lens;
  }

  private int[][] getDnIndexSuite() {
    final int maxNumLevel = 2;
    final int maxPerLevel = 5;
    List<List<Integer>> allLists = new ArrayList<>();
    int numIndex = parityBlocks;
    for (int i = 0; i < maxNumLevel && numIndex > 1; i++) {
      List<List<Integer>> lists =
          combinations(dataBlocks + parityBlocks, numIndex);
      if (lists.size() > maxPerLevel) {
        Collections.shuffle(lists);
        lists = lists.subList(0, maxPerLevel);
      }
      allLists.addAll(lists);
      numIndex--;
    }
    int[][] dnIndexArray = new int[allLists.size()][];
    for (int i = 0; i < dnIndexArray.length; i++) {
      int[] list = new int[allLists.get(i).size()];
      for (int j = 0; j < list.length; j++) {
        list[j] = allLists.get(i).get(j);
      }
      dnIndexArray[i] = list;
    }
    return dnIndexArray;
  }

  // get all combinations of k integers from {0,...,n-1}
  private static List<List<Integer>> combinations(int n, int k) {
    List<List<Integer>> res = new LinkedList<List<Integer>>();
    if (k >= 1 && n >= k) {
      getComb(n, k, new Stack<Integer>(), res);
    }
    return res;
  }

  private static void getComb(int n, int k, Stack<Integer> stack,
      List<List<Integer>> res) {
    if (stack.size() == k) {
      List<Integer> list = new ArrayList<Integer>(stack);
      res.add(list);
    } else {
      int next = stack.empty() ? 0 : stack.peek() + 1;
      while (next < n) {
        stack.push(next);
        getComb(n, k, stack, res);
        next++;
      }
    }
    if (!stack.empty()) {
      stack.pop();
    }
  }

  int[] getKillPositions(int fileLen, int num) {
    int[] positions = new int[num];
    for (int i = 0; i < num; i++) {
      positions[i] = fileLen * (i + 1) / (num + 1);
    }
    return positions;
  }

  Integer getLength(int i) {
    return i >= 0 && i < lengths.size() ? lengths.get(i): null;
  }

  void setup(Configuration conf) throws IOException {
    System.out.println("NUM_DATA_BLOCKS  = " + dataBlocks);
    System.out.println("NUM_PARITY_BLOCKS= " + parityBlocks);
    System.out.println("CELL_SIZE        = " + cellSize + " (=" +
        StringUtils.TraditionalBinaryPrefix.long2String(cellSize, "B", 2)
        + ")");
    System.out.println("BLOCK_SIZE       = " + blockSize + " (=" +
        StringUtils.TraditionalBinaryPrefix.long2String(blockSize, "B", 2)
        + ")");
    System.out.println("BLOCK_GROUP_SIZE = " + blockGroupSize + " (=" +
        StringUtils.TraditionalBinaryPrefix.long2String(blockGroupSize, "B", 2)
        + ")");
    final int numDNs = dataBlocks + parityBlocks;
    if (ErasureCodeNative.isNativeCodeLoaded()) {
      conf.set(
          CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
          NativeRSRawErasureCoderFactory.CODER_NAME);
    }
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
    cluster.waitActive();
    dfs = cluster.getFileSystem();
    AddErasureCodingPolicyResponse[] res =
        dfs.addErasureCodingPolicies(new ErasureCodingPolicy[]{ecPolicy});
    ecPolicy = res[0].getPolicy();
    dfs.enableErasureCodingPolicy(ecPolicy.getName());
    DFSTestUtil.enableAllECPolicies(dfs);
    dfs.mkdirs(dir);
    dfs.setErasureCodingPolicy(dir, ecPolicy.getName());
  }

  void tearDown() {
    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }
  }

  HdfsConfiguration newHdfsConfiguration() {
    final HdfsConfiguration conf = new HdfsConfiguration();
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
        false);
    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
    return conf;
  }

  void runTest(final int length) {
    final HdfsConfiguration conf = newHdfsConfiguration();
    for (int dn = 0; dn < dataBlocks + parityBlocks; dn++) {
      try {
        LOG.info("runTest: dn=" + dn + ", length=" + length);
        setup(conf);
        runTest(length, new int[]{length / 2}, new int[]{dn}, false);
      } catch (Throwable e) {
        final String err = "failed, dn=" + dn + ", length=" + length
            + StringUtils.stringifyException(e);
        LOG.error(err);
        Assert.fail(err);
      } finally {
        tearDown();
      }
    }
  }

  void runTestWithMultipleFailure(final int length) throws Exception {
    final HdfsConfiguration conf = newHdfsConfiguration();
    for (int[] dnIndex : dnIndexSuite) {
      int[] killPos = getKillPositions(length, dnIndex.length);
      try {
        LOG.info("runTestWithMultipleFailure: length==" + length + ", killPos="
            + Arrays.toString(killPos) + ", dnIndex="
            + Arrays.toString(dnIndex));
        setup(conf);
        runTest(length, killPos, dnIndex, false);
      } catch (Throwable e) {
        final String err = "failed, killPos=" + Arrays.toString(killPos)
            + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
        LOG.error(err);
        throw e;
      } finally {
        tearDown();
      }
    }
  }

  /**
   * runTest implementation.
   * @param length file length
   * @param killPos killing positions in ascending order
   * @param dnIndex DN index to kill when meets killing positions
   * @param tokenExpire wait token to expire when kill a DN
   * @throws Exception
   */
  void runTest(final int length, final int[] killPos,
      final int[] dnIndex, final boolean tokenExpire) throws Exception {
    if (killPos[0] <= FLUSH_POS) {
      LOG.warn("killPos=" + Arrays.toString(killPos) + " <= FLUSH_POS="
          + FLUSH_POS + ", length=" + length + ", dnIndex="
          + Arrays.toString(dnIndex));
      return; //skip test
    }
    Preconditions.checkArgument(length > killPos[0], "length=%s <= killPos=%s",
        length, killPos);
    Preconditions.checkArgument(killPos.length == dnIndex.length);

    final Path p = new Path(dir, "dn" + Arrays.toString(dnIndex)
        + "len" + length + "kill" +  Arrays.toString(killPos));
    final String fullPath = p.toString();
    LOG.info("fullPath=" + fullPath);

    if (tokenExpire) {
      final NameNode nn = cluster.getNameNode();
      final BlockManager bm = nn.getNamesystem().getBlockManager();
      final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();

      // set a short token lifetime (6 second)
      SecurityTestUtil.setBlockTokenLifetime(sm, 6000L);
    }

    final AtomicInteger pos = new AtomicInteger();
    final FSDataOutputStream out = dfs.create(p);
    final DFSStripedOutputStream stripedOut
        = (DFSStripedOutputStream)out.getWrappedStream();

    // first GS of this block group which never proceeds blockRecovery
    long firstGS = -1;
    long oldGS = -1; // the old GS before bumping
    List<Long> gsList = new ArrayList<>();
    final List<DatanodeInfo> killedDN = new ArrayList<>();
    int numKilled = 0;
    for(; pos.get() < length;) {
      final int i = pos.getAndIncrement();
      if (numKilled < killPos.length && i == killPos[numKilled]) {
        assertTrue(firstGS != -1);
        final long gs = getGenerationStamp(stripedOut);
        if (numKilled == 0) {
          assertEquals(firstGS, gs);
        } else {
          //TODO: implement hflush/hsync and verify gs strict greater than oldGS
          assertTrue(gs >= oldGS);
        }
        oldGS = gs;

        if (tokenExpire) {
          DFSTestUtil.flushInternal(stripedOut);
          waitTokenExpires(out);
        }

        killedDN.add(
            killDatanode(cluster, stripedOut, dnIndex[numKilled], pos));
        numKilled++;
      }

      write(out, i);

      if (i % blockGroupSize == FLUSH_POS) {
        firstGS = getGenerationStamp(stripedOut);
        oldGS = firstGS;
      }
      if (i > 0 && (i + 1) % blockGroupSize == 0) {
        gsList.add(oldGS);
      }
    }
    gsList.add(oldGS);
    out.close();
    assertEquals(dnIndex.length, numKilled);

    StripedFileTestUtil.waitBlockGroupsReported(dfs, fullPath, numKilled);

    cluster.triggerBlockReports();
    StripedFileTestUtil.checkData(dfs, p, length, killedDN, gsList,
        blockGroupSize);
  }

  static void write(FSDataOutputStream out, int i) throws IOException {
    try {
      out.write(StripedFileTestUtil.getByte(i));
    } catch(IOException ioe) {
      throw new IOException("Failed at i=" + i, ioe);
    }
  }

  static long getGenerationStamp(DFSStripedOutputStream out)
      throws IOException {
    final long gs = out.getBlock().getGenerationStamp();
    LOG.info("getGenerationStamp returns " + gs);
    return gs;
  }

  static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
    for(;;) {
      DatanodeInfo[] datanodes = streamer.getNodes();
      if (datanodes == null) {
        // try peeking following block.
        final LocatedBlock lb = streamer.peekFollowingBlock();
        if (lb != null) {
          datanodes = lb.getLocations();
        }
      }

      if (datanodes != null) {
        Assert.assertEquals(1, datanodes.length);
        Assert.assertNotNull(datanodes[0]);
        return datanodes[0];
      }

      try {
        Thread.sleep(100);
      } catch (InterruptedException ie) {
        Assert.fail(StringUtils.stringifyException(ie));
        return null;
      }
    }
  }

  static DatanodeInfo killDatanode(MiniDFSCluster cluster,
      DFSStripedOutputStream out, final int dnIndex, final AtomicInteger pos) {
    final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
    final DatanodeInfo datanode = getDatanodes(s);
    LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
    if (datanode != null) {
      cluster.stopDataNode(datanode.getXferAddr());
    }
    return datanode;
  }

  private void waitTokenExpires(FSDataOutputStream out) throws IOException {
    Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
    while (!SecurityTestUtil.isBlockTokenExpired(token)) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException ignored) {
      }
    }
  }
}