TestStateAlignmentContextWithHA.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * Class is used to test server sending state alignment information to clients
 * via RPC and likewise clients receiving and updating their last known
 * state alignment info.
 * These tests check that after a single RPC call a client will have caught up
 * to the most recent alignment state of the server.
 */
public class TestStateAlignmentContextWithHA {
  public static final Logger LOG =
      LoggerFactory.getLogger(TestStateAlignmentContextWithHA.class.getName());

  private static final int NUMDATANODES = 1;
  private static final int NUMCLIENTS = 10;
  private static final int NUMFILES = 120;
  private static final Configuration CONF = new HdfsConfiguration();
  private static final List<ClientGSIContext> AC_LIST = new ArrayList<>();

  private static MiniQJMHACluster qjmhaCluster;
  private static MiniDFSCluster cluster;
  private static List<Worker> clients;

  private DistributedFileSystem dfs;
  private int active = 0;
  private int standby = 1;

  static class ORPPwithAlignmentContexts<T extends ClientProtocol>
      extends ObserverReadProxyProvider<T> {

    public ORPPwithAlignmentContexts(
        Configuration conf, URI uri, Class<T> xface,
        HAProxyFactory<T> factory) throws IOException {
      super(conf, uri, xface, factory);

      AC_LIST.add((ClientGSIContext) getAlignmentContext());
    }
  }

  @BeforeAll
  public static void startUpCluster() throws IOException {
    // Set short retry timeouts so this test runs faster
    CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
    CONF.setBoolean(String.format(
        "fs.%s.impl.disable.cache", HdfsConstants.HDFS_URI_SCHEME), true);
    CONF.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, NUMDATANODES);
    CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);

    qjmhaCluster = HATestUtil.setUpObserverCluster(CONF, 1, NUMDATANODES, true);
    cluster = qjmhaCluster.getDfsCluster();
  }

  @BeforeEach
  public void before() throws IOException, URISyntaxException {
    dfs = HATestUtil.configureObserverReadFs(
        cluster, CONF, ORPPwithAlignmentContexts.class, true);
  }

  @AfterAll
  public static void shutDownCluster() throws IOException {
    if (qjmhaCluster != null) {
      qjmhaCluster.shutdown();
    }
  }

  @AfterEach
  public void after() throws IOException {
    killWorkers();
    cluster.transitionToStandby(1);
    cluster.transitionToActive(0);
    active = 0;
    standby = 1;
    if (dfs != null) {
      dfs.close();
      dfs = null;
    }
    AC_LIST.clear();
  }

  /**
   * This test checks if after a client writes we can see the state id in
   * updated via the response.
   */
  @Test
  public void testStateTransferOnWrite() throws Exception {
    long preWriteState =
        cluster.getNamesystem(active).getLastWrittenTransactionId();
    DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
    long clientState = getContext(0).getLastSeenStateId();
    long postWriteState =
        cluster.getNamesystem(active).getLastWrittenTransactionId();
    // Write(s) should have increased state. Check for greater than.
    assertTrue(clientState > preWriteState);
    // Client and server state should be equal.
    assertEquals(clientState, postWriteState);
  }

  /**
   * This test checks if after a client reads we can see the state id in
   * updated via the response.
   */
  @Test
  public void testStateTransferOnRead() throws Exception {
    DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
    long lastWrittenId =
        cluster.getNamesystem(active).getLastWrittenTransactionId();
    DFSTestUtil.readFile(dfs, new Path("/testFile2"));
    // Read should catch client up to last written state.
    long clientState = getContext(0).getLastSeenStateId();
    assertEquals(clientState, lastWrittenId);
  }

  /**
   * This test checks that a fresh client starts with no state and becomes
   * updated of state from RPC call.
   */
  @Test
  public void testStateTransferOnFreshClient() throws Exception {
    DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
    long lastWrittenId =
        cluster.getNamesystem(active).getLastWrittenTransactionId();
    try (DistributedFileSystem clearDfs = HATestUtil.configureObserverReadFs(
            cluster, CONF, ORPPwithAlignmentContexts.class, true);) {
      ClientGSIContext clientState = getContext(1);
      assertEquals(clientState.getLastSeenStateId(), Long.MIN_VALUE);
      DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
      assertEquals(clientState.getLastSeenStateId(), lastWrittenId);
    }
  }

  /**
   * This test checks if after a client writes we can see the state id in
   * updated via the response.
   */
  @Test
  public void testStateTransferOnWriteWithFailover() throws Exception {
    long preWriteState =
        cluster.getNamesystem(active).getLastWrittenTransactionId();
    // Write using HA client.
    DFSTestUtil.writeFile(dfs, new Path("/testFile1FO"), "123");
    long clientState = getContext(0).getLastSeenStateId();
    long postWriteState =
        cluster.getNamesystem(active).getLastWrittenTransactionId();
    // Write(s) should have increased state. Check for greater than.
    assertTrue(clientState > preWriteState);
    // Client and server state should be equal.
    assertEquals(clientState, postWriteState);

    // Failover NameNode.
    failOver();

    // Write using HA client.
    DFSTestUtil.writeFile(dfs, new Path("/testFile2FO"), "456");
    long clientStateFO = getContext(0).getLastSeenStateId();
    long writeStateFO =
        cluster.getNamesystem(active).getLastWrittenTransactionId();

    // Write(s) should have increased state. Check for greater than.
    assertTrue(clientStateFO > postWriteState);
    // Client and server state should be equal.
    assertEquals(clientStateFO, writeStateFO);
  }

  @Test
  @Timeout(value = 300)
  public void testMultiClientStatesWithRandomFailovers() throws Exception {
    // First run, half the load, with one failover.
    runClientsWithFailover(1, NUMCLIENTS/2, NUMFILES/2);
    // Second half, with fail back.
    runClientsWithFailover(NUMCLIENTS/2 + 1, NUMCLIENTS, NUMFILES/2);
  }

  private void runClientsWithFailover(int clientStartId,
                                      int numClients,
                                      int numFiles)
      throws Exception {
    ExecutorService execService = Executors.newFixedThreadPool(2);
    clients = new ArrayList<>(numClients);
    for (int i = clientStartId; i <= numClients; i++) {
      DistributedFileSystem haClient = HATestUtil.configureObserverReadFs(
          cluster, CONF, ORPPwithAlignmentContexts.class, true);
      clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i));
    }

    // Execute workers in threadpool with random failovers.
    List<Future<STATE>> futures = submitAll(execService, clients);
    execService.shutdown();

    boolean finished = false;
    failOver();

    while (!finished) {
      finished = execService.awaitTermination(20L, TimeUnit.SECONDS);
    }

    // Validation.
    for (Future<STATE> future : futures) {
      assertEquals(future.get(), STATE.SUCCESS);
    }

    clients.clear();
  }

  private ClientGSIContext getContext(int clientCreationIndex) {
    return AC_LIST.get(clientCreationIndex);
  }

  private void failOver() throws IOException {
    LOG.info("Transitioning Active to Standby");
    cluster.transitionToStandby(active);
    LOG.info("Transitioning Standby to Active");
    cluster.transitionToActive(standby);
    int tempActive = active;
    active = standby;
    standby = tempActive;
  }

  /* Executor.invokeAll() is blocking so utilizing submit instead. */
  private static List<Future<STATE>> submitAll(ExecutorService executor,
                                              Collection<Worker> calls) {
    List<Future<STATE>> futures = new ArrayList<>(calls.size());
    for (Worker call : calls) {
      Future<STATE> future = executor.submit(call);
      futures.add(future);
    }
    return futures;
  }

  private void killWorkers() throws IOException {
    if (clients != null) {
      for(Worker worker : clients) {
        worker.kill();
      }
      clients = null;
    }
  }

  private enum STATE { SUCCESS, FAIL, ERROR }

  private class Worker implements Callable<STATE> {
    private final DistributedFileSystem client;
    private final int filesToMake;
    private String filePath;
    private final int nonce;

    Worker(DistributedFileSystem client,
           int filesToMake,
           String filePath,
           int nonce) {
      this.client = client;
      this.filesToMake = filesToMake;
      this.filePath = filePath;
      this.nonce = nonce;
    }

    @Override
    public STATE call() {
      int i = -1;
      try {
        for (i = 0; i < filesToMake; i++) {
          ClientGSIContext gsiContext = getContext(nonce);
          long preClientStateFO = gsiContext.getLastSeenStateId();

          // Write using HA client.
          Path path = new Path(filePath + nonce + "_" + i);
          DFSTestUtil.writeFile(client, path, "erk");

          long postClientStateFO = gsiContext.getLastSeenStateId();

          // Write(s) should have increased state. Check for greater than.
          if (postClientStateFO < 0 || postClientStateFO <= preClientStateFO) {
            LOG.error("FAIL: Worker started with: {} , but finished with: {}",
                preClientStateFO, postClientStateFO);
            return STATE.FAIL;
          }

          if(i % (NUMFILES/10) == 0) {
            LOG.info("Worker {} created {} files", nonce, i);
            LOG.info("LastSeenStateId = {}", postClientStateFO);
          }
        }
        return STATE.SUCCESS;
      } catch (Exception e) {
        LOG.error("ERROR: Worker failed with: ", e);
        return STATE.ERROR;
      } finally {
        LOG.info("Worker {} created {} files", nonce, i);
      }
    }

    public void kill() throws IOException {
      client.dfs.closeAllFilesBeingWritten(true);
      client.dfs.closeOutputStreams(true);
      client.dfs.closeConnectionToNamenode();
      client.dfs.close();
      client.close();
    }
  }
}