TestLocatedBlocksRefresher.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestLocatedBlocksRefresher {
  private static final Logger LOG = LoggerFactory.getLogger(TestLocatedBlocksRefresher.class);

  private static final int BLOCK_SIZE = 1024 * 1024;
  private static final short REPLICATION_FACTOR = (short) 4;
  private static final String[] RACKS = new String[] {
      "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
  private static final int NUM_DATA_NODES = RACKS.length;

  private final int numOfBlocks = 24;
  private final int fileLength = numOfBlocks * BLOCK_SIZE;
  private final int dfsClientPrefetchSize = fileLength / 2;

  private MiniDFSCluster cluster;
  private Configuration conf;

  @BeforeEach
  public void setUp() throws Exception {
    cluster = null;
    conf = new HdfsConfiguration();

    // disable shortcircuit reading
    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
    // set replication factor
    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
    // set block size and other sizes
    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
    conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
        dfsClientPrefetchSize);
  }

  @AfterEach
  public void tearDown() throws Exception {
    if (cluster != null) {
      cluster.shutdown(true, true);
    }
  }

  private void setupTest(long refreshInterval) throws IOException {
    conf.setLong(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, refreshInterval);

    // this is necessary to ensure no caching between runs
    conf.set("dfs.client.context", UUID.randomUUID().toString());

    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
    cluster.waitActive();
  }

  @Test
  public void testDisabledOnZeroInterval() throws IOException {
    setupTest(0);
    assertNull(cluster.getFileSystem().getClient().getLocatedBlockRefresher());
  }

  @Test
  public void testEnabledOnNonZeroInterval() throws Exception {
    setupTest(1000);
    LocatedBlocksRefresher refresher =
        cluster.getFileSystem().getClient().getLocatedBlockRefresher();
    assertNotNull(refresher);
    assertNoMoreRefreshes(refresher);
  }

  @Test
  public void testRefreshOnDeadNodes() throws Exception {
    setupTest(1000);
    DistributedFileSystem fs = cluster.getFileSystem();
    DFSClient client = fs.getClient();
    LocatedBlocksRefresher refresher = client.getLocatedBlockRefresher();

    String fileName = createTestFile(fs);

    try (DFSInputStream fin = client.open(fileName)) {
      LocatedBlocks locatedBlocks = fin.locatedBlocks;
      assertEquals(dfsClientPrefetchSize / BLOCK_SIZE, locatedBlocks.locatedBlockCount());

      // should not be tracked yet
      assertFalse(refresher.isInputStreamTracked(fin));

      // track and verify
      refresher.addInputStream(fin);
      assertTrue(refresher.isInputStreamTracked(fin));

      // no refreshes yet, as nothing has happened
      assertNoMoreRefreshes(refresher);
      synchronized (fin.infoLock) {
        assertSame(locatedBlocks, fin.locatedBlocks);
      }

      stopNodeHostingBlocks(fin, NUM_DATA_NODES - 1);

      // read blocks, which should trigger dead node for the one we stopped
      int chunkReadSize = BLOCK_SIZE / 4;
      byte[] readBuffer = new byte[chunkReadSize];
      fin.read(0, readBuffer, 0, readBuffer.length);

      assertEquals(1, fin.getLocalDeadNodes().size());

      // we should get a refresh now
      assertRefreshes(refresher, 1);

      // verify that it actually changed things
      synchronized (fin.infoLock) {
        assertNotSame(locatedBlocks, fin.locatedBlocks);
        assertTrue(fin.getLocalDeadNodes().isEmpty());
      }

      // no more refreshes because everything is happy again
      assertNoMoreRefreshes(refresher);

      // stop another node, and try to trigger a new deadNode
      stopNodeHostingBlocks(fin, NUM_DATA_NODES - 2);
      readBuffer = new byte[chunkReadSize];
      fin.read(0, readBuffer, 0, readBuffer.length);

      // we should refresh again now, and verify
      // may actually be more than 1, since the first dead node
      // may still be listed in the replicas for the bock
      assertTrue(fin.getLocalDeadNodes().size() > 0);

      assertRefreshes(refresher, 1);

      synchronized (fin.infoLock) {
        assertNotSame(locatedBlocks, fin.locatedBlocks);
        assertTrue(fin.getLocalDeadNodes().isEmpty());
      }

      // de-register, and expect no more refreshes below
      refresher.removeInputStream(fin);
    }

    assertNoMoreRefreshes(refresher);
  }

  private void stopNodeHostingBlocks(DFSInputStream fin, int expectedNodes) {
    synchronized (fin.infoLock) {
      int idx = fin.locatedBlocks.findBlock(0);
      for (int i = 0; i < REPLICATION_FACTOR; i++) {
        String deadNodeAddr = fin.locatedBlocks.get(idx).getLocations()[i].getXferAddr();

        DataNodeProperties dataNodeProperties = cluster.stopDataNode(deadNodeAddr);
        if (dataNodeProperties != null) {
          List<DataNode> datanodesPostStoppage = cluster.getDataNodes();
          assertEquals(expectedNodes, datanodesPostStoppage.size());
          return;
        }
      }

      throw new RuntimeException("Could not find a datanode to stop");
    }
  }

  private void assertNoMoreRefreshes(LocatedBlocksRefresher refresher) throws InterruptedException {
    long interval = refresher.getInterval();
    int runCount = refresher.getRunCount();
    int refreshCount = refresher.getRefreshCount();

    LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes",
        runCount + 3, runCount);
    // wait for it to run 3 times, with some buffer
    awaitWithTimeout(() -> refresher.getRunCount() > runCount + 3, 5 * interval);

    // it should not have refreshed anything, because no DFSInputStreams registered anymore
    assertEquals(refreshCount, refresher.getRefreshCount());
  }

  private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefreshes)
      throws InterruptedException {
    int runCount = refresher.getRunCount();
    int refreshCount = refresher.getRefreshCount();
    int expectedRuns = 3;

    if (expectedRefreshes < 0) {
      expectedRefreshes = expectedRuns;
    }

    LOG.info(
        "Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}",
        runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, refreshCount
    );

    // wait for it to run 3 times
    awaitWithTimeout(() -> refresher.getRunCount() >= runCount + expectedRuns, 10_000);

    // the values may not be identical due to any refreshes that occurred before we opened
    // the DFSInputStream but the difference should be identical since we are refreshing
    // every time
    assertEquals(expectedRefreshes, refresher.getRefreshCount() - refreshCount);
  }

  private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis)
      throws InterruptedException {
    long now = Time.monotonicNow();

    while(!test.get()) {
      if (Time.monotonicNow() - now > timeoutMillis) {
        fail("Timed out waiting for true condition");
        return;
      }

      Thread.sleep(50);
    }
  }

  private String createTestFile(FileSystem fs) throws IOException {
    String fileName = "/located_blocks_" + UUID.randomUUID().toString();
    Path filePath = new Path(fileName);
    try (FSDataOutputStream fout = fs.create(filePath, REPLICATION_FACTOR)) {
      fout.write(new byte[(fileLength)]);
    }
    fs.deleteOnExit(filePath);

    return fileName;
  }
}