TestDFSInputStreamBlockLocations.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
 * Test the caches expiration of the block locations.
 */
@RunWith(Parameterized.class)
public class TestDFSInputStreamBlockLocations {
  private static final int BLOCK_SIZE = 1024 * 1024;
  private static final String[] RACKS = new String[] {
      "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
  private static final int NUM_DATA_NODES = RACKS.length;
  private static final short REPLICATION_FACTOR = (short) 4;
  private final int staleInterval = 8000;
  private final int numOfBlocks = 24;
  private final int fileLength = numOfBlocks * BLOCK_SIZE;
  private final int dfsClientPrefetchSize = fileLength / 2;
  // locatedBlocks expiration set to 1 hour
  private final long dfsInputLocationsTimeout = 60 * 60 * 1000L;

  private HdfsConfiguration conf;
  private MiniDFSCluster dfsCluster;
  private DFSClient dfsClient;
  private DistributedFileSystem fs;
  private Path filePath;
  private boolean enableBlkExpiration;

  @Parameterized.Parameters(name = "{index}: CacheExpirationConfig(Enable {0})")
  public static Collection<Object[]> getTestParameters() {
    return Arrays.asList(new Object[][] {
        {Boolean.TRUE},
        {Boolean.FALSE}
    });
  }

  public TestDFSInputStreamBlockLocations(Boolean enableExpiration) {
    enableBlkExpiration = enableExpiration;
  }

  @Before
  public void setup() throws IOException {
    conf = new HdfsConfiguration();
    conf.setBoolean(
        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
    // set the heartbeat intervals and stale considerations
    conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
        staleInterval);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
        staleInterval / 2);
    // disable shortcircuit reading
    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
    // set replication factor
    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
    // set block size and other sizes
    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
    conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
        dfsClientPrefetchSize);
    if (enableBlkExpiration) {
      // set the refresh locations for every dfsInputLocationsTimeout
      conf.setLong(
          HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
          dfsInputLocationsTimeout);
    }
    // start the cluster and create a DFSClient
    dfsCluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
    dfsCluster.waitActive();
    assertEquals(NUM_DATA_NODES, dfsCluster.getDataNodes().size());
    InetSocketAddress addr = new InetSocketAddress("localhost",
        dfsCluster.getNameNodePort());
    dfsClient = new DFSClient(addr, conf);
    fs = dfsCluster.getFileSystem();
  }

  @After
  public void teardown() throws IOException {
    if (dfsClient != null) {
      dfsClient.close();
      dfsClient = null;
    }
    if (fs != null) {
      fs.deleteOnExit(filePath);
      fs.close();
      fs = null;
    }
    if (dfsCluster != null) {
      dfsCluster.shutdown();
      dfsCluster = null;
    }
  }

  @Test
  public void testRefreshBlockLocations() throws IOException {
    final String fileName = "/test_cache_locations";
    filePath = createFile(fileName);

    try (DFSInputStream fin = dfsClient.open(fileName)) {
      LocatedBlocks existing = fin.locatedBlocks;
      long lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();

      assertFalse("should not have attempted refresh",
          fin.refreshBlockLocations(null));
      assertEquals("should not have updated lastRefreshedAt",
          lastRefreshedAt, fin.getLastRefreshedBlocksAtForTesting());
      assertSame("should not have modified locatedBlocks",
          existing, fin.locatedBlocks);

      // fake a dead node to force refresh
      // refreshBlockLocations should return true, indicating we attempted a refresh
      // nothing should be changed, because locations have not changed
      fin.addToLocalDeadNodes(dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]);
      assertTrue("should have attempted refresh",
          fin.refreshBlockLocations(null));
      verifyChanged(fin, existing, lastRefreshedAt);

      // reset
      lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();
      existing = fin.locatedBlocks;

      // It's hard to test explicitly for non-local nodes, but we can fake it
      // because we also treat unresolved as non-local. Pass in a cache where all the datanodes
      // are unresolved hosts.
      Map<String, InetSocketAddress> mockAddressCache = new HashMap<>();
      InetSocketAddress unresolved = InetSocketAddress.createUnresolved("www.google.com", 80);
      for (DataNode dataNode : dfsCluster.getDataNodes()) {
        mockAddressCache.put(dataNode.getDatanodeUuid(), unresolved);
      }

      assertTrue("should have attempted refresh",
          fin.refreshBlockLocations(mockAddressCache));
      verifyChanged(fin, existing, lastRefreshedAt);
    }
  }

  private void verifyChanged(DFSInputStream fin, LocatedBlocks existing, long lastRefreshedAt) {
    assertTrue("lastRefreshedAt should have incremented",
        fin.getLastRefreshedBlocksAtForTesting() > lastRefreshedAt);
    assertNotSame("located blocks should have changed",
        existing, fin.locatedBlocks);
    assertTrue("deadNodes should be empty",
        fin.getLocalDeadNodes().isEmpty());
  }

  @Test
  public void testDeferredRegistrationStatefulRead() throws IOException {
    testWithRegistrationMethod(DFSInputStream::read);
  }

  @Test
  public void testDeferredRegistrationPositionalRead() throws IOException {
    testWithRegistrationMethod(fin -> fin.readFully(0, new byte[1]));
  }

  @Test
  public void testDeferredRegistrationGetAllBlocks() throws IOException {
    testWithRegistrationMethod(DFSInputStream::getAllBlocks);
  }

  /**
   * If the ignoreList contains all datanodes, the ignoredList should be cleared to take advantage
   * of retries built into chooseDataNode. This is needed for hedged reads
   * @throws IOException
   */
  @Test
  public void testClearIgnoreListChooseDataNode() throws IOException {
    final String fileName = "/test_cache_locations";
    filePath = createFile(fileName);

    try (DFSInputStream fin = dfsClient.open(fileName)) {
      LocatedBlocks existing = fin.locatedBlocks;
      LocatedBlock block = existing.getLastLocatedBlock();
      ArrayList<DatanodeInfo> ignoreList = new ArrayList<>(Arrays.asList(block.getLocations()));
      Assert.assertNotNull(fin.chooseDataNode(block, ignoreList, true));
      Assert.assertEquals(0, ignoreList.size());
    }
  }

  @FunctionalInterface
  interface ThrowingConsumer {
    void accept(DFSInputStream fin) throws IOException;
  }

  private void testWithRegistrationMethod(ThrowingConsumer registrationMethod) throws IOException {
    final String fileName = "/test_cache_locations";
    filePath = createFile(fileName);

    DFSInputStream fin = null;
    try {
      fin = dfsClient.open(fileName);
      assertFalse("should not be tracking input stream on open",
          dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));

      // still not registered because it hasn't been an hour by the time we call this
      registrationMethod.accept(fin);
      assertFalse("should not be tracking input stream after first read",
          dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));

      // artificially make it have been an hour
      fin.setLastRefreshedBlocksAtForTesting(Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
      registrationMethod.accept(fin);
      assertEquals("SHOULD be tracking input stream on read after interval, only if enabled",
          enableBlkExpiration, dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
    } finally {
      if (fin != null) {
        fin.close();
        assertFalse(dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
      }
      fs.delete(filePath, true);
    }
  }

  private Path createFile(String fileName) throws IOException {
    Path path = new Path(fileName);
    try (FSDataOutputStream fout = fs.create(path, REPLICATION_FACTOR)) {
      fout.write(new byte[(fileLength)]);
    }
    return path;
  }
}