TestLargeBlockReport.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;

import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;

/**
 * Tests that very large block reports can pass through the RPC server and
 * deserialization layers successfully if configured.
 */
public class TestLargeBlockReport {

  private final HdfsConfiguration conf = new HdfsConfiguration();
  private MiniDFSCluster cluster;
  private DataNode dn;
  private BPOfferService bpos;
  private DatanodeProtocolClientSideTranslatorPB nnProxy;
  private DatanodeRegistration bpRegistration;
  private String bpId;
  private DatanodeStorage dnStorage;
  private final long reportId = 1;
  private final long fullBrLeaseId = 0;

  @BeforeClass
  public static void init() {
    DFSTestUtil.setNameNodeLogLevel(Level.WARN);
    FsDatasetImplTestUtils.setFsDatasetImplLogLevel(Level.WARN);
  }

  @After
  public void tearDown() {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  @Test
  public void testBlockReportExceedsLengthLimit() throws Exception {
    //protobuf's default limit increased to 2GB from protobuf 3.x onwards.
    //So there will not be any exception thrown from protobuf.
    conf.setInt(IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT / 2);
    initCluster();
    // Create a large enough report that we expect it will go beyond the RPC
    // server's length validation, and also protobuf length validation.
    StorageBlockReport[] reports = createReports(6000000);
    try {
      nnProxy.blockReport(bpRegistration, bpId, reports,
          new BlockReportContext(1, 0, reportId, fullBrLeaseId));
      fail("Should have failed because of the too long RPC data length");
    } catch (Exception e) {
      // Expected.  We can't reliably assert anything about the exception type
      // or the message.  The NameNode just disconnects, and the details are
      // buried in the NameNode log.
    }
  }

  @Test
  public void testBlockReportSucceedsWithLargerLengthLimit() throws Exception {
    conf.setInt(IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT * 2);
    initCluster();
    StorageBlockReport[] reports = createReports(6000000);
    nnProxy.blockReport(bpRegistration, bpId, reports,
        new BlockReportContext(1, 0, reportId, fullBrLeaseId));
  }

  /**
   * Creates storage block reports, consisting of a single report with the
   * requested number of blocks.  The block data is fake, because the tests just
   * need to validate that the messages can pass correctly.  This intentionally
   * uses the old-style decoding method as a helper.  The test needs to cover
   * the new-style encoding technique.  Passing through that code path here
   * would trigger an exception before the test is ready to deal with it.
   *
   * @param numBlocks requested number of blocks
   * @return storage block reports
   */
  private StorageBlockReport[] createReports(int numBlocks) {
    int longsPerBlock = 3;
    int blockListSize = 2 + numBlocks * longsPerBlock;
    List<Long> longs = new ArrayList<Long>(blockListSize);
    longs.add(Long.valueOf(numBlocks));
    longs.add(0L);
    for (int i = 0; i < blockListSize; ++i) {
      longs.add(Long.valueOf(i));
    }
    BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
    StorageBlockReport[] reports = new StorageBlockReport[] {
        new StorageBlockReport(dnStorage, blockList) };
    return reports;
  }

  /**
   * Start a mini-cluster, and set up everything the tests need to use it.
   *
   * @throws Exception if initialization fails
   */
  private void initCluster() throws Exception {
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    cluster.waitActive();
    dn = cluster.getDataNodes().get(0);
    bpos = dn.getAllBpOs().get(0);
    nnProxy = bpos.getActiveNN();
    bpRegistration = bpos.bpRegistration;
    bpId = bpos.getBlockPoolId();
    dnStorage = dn.getFSDataset().getBlockReports(bpId).keySet().iterator()
        .next();
  }
}