TestRouterClientRejectOverload.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.STANDBY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToActive;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Test the Router overload control which rejects requests when the RPC client
 * is overloaded. This feature is managed by
 * {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}.
 */
public class TestRouterClientRejectOverload {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestRouterClientRejectOverload.class);

  private StateStoreDFSCluster cluster;

  @AfterEach
  public void cleanup() {
    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }
  }

  private void setupCluster(boolean overloadControl, boolean ha)
      throws Exception {
    // Build and start a federated cluster
    cluster = new StateStoreDFSCluster(ha, 2);
    Configuration routerConf = new RouterConfigBuilder()
        .stateStore()
        .metrics()
        .admin()
        .rpc()
        .heartbeat()
        .build();

    // Reduce the number of RPC clients threads to overload the Router easy
    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
    // Overload control
    routerConf.setBoolean(
        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl);

    // No need for datanodes as we use renewLease() for testing
    cluster.setNumDatanodesPerNameservice(0);

    cluster.addRouterOverrides(routerConf);
    cluster.startCluster();
    cluster.startRouters();
    cluster.waitClusterUp();
  }

  @Test
  public void testWithoutOverloadControl() throws Exception {
    setupCluster(false, false);

    // Nobody should get overloaded
    testOverloaded(0);

    // Set subcluster 0 as slow
    MiniDFSCluster dfsCluster = cluster.getCluster();
    NameNode nn0 = dfsCluster.getNameNode(0);
    simulateSlowNamenode(nn0, 1);

    // Nobody should get overloaded, but it will be really slow
    testOverloaded(0);

    // No rejected requests expected
    for (RouterContext router : cluster.getRouters()) {
      FederationRPCMetrics rpcMetrics =
          router.getRouter().getRpcServer().getRPCMetrics();
      assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded());
    }
  }

  @Test
  public void testOverloadControl() throws Exception {
    setupCluster(true, false);

    List<RouterContext> routers = cluster.getRouters();
    FederationRPCMetrics rpcMetrics0 =
        routers.get(0).getRouter().getRpcServer().getRPCMetrics();
    FederationRPCMetrics rpcMetrics1 =
        routers.get(1).getRouter().getRpcServer().getRPCMetrics();

    // Nobody should get overloaded
    testOverloaded(0);
    assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded());
    assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded());

    // Set subcluster 0 as slow
    MiniDFSCluster dfsCluster = cluster.getCluster();
    NameNode nn0 = dfsCluster.getNameNode(0);
    simulateSlowNamenode(nn0, 1);

    // The subcluster should be overloaded now and reject 4-5 requests
    testOverloaded(4, 6);
    assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded()
        + rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4);

    // Client using HA with 2 Routers
    // A single Router gets overloaded, but 2 will handle it
    Configuration clientConf = cluster.getRouterClientConf();

    // Each Router should get a similar number of ops (>=8) out of 2*10
    long iniProxyOps0 = rpcMetrics0.getProxyOps();
    long iniProxyOps1 = rpcMetrics1.getProxyOps();
    testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
    long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
    long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
    assertEquals(2 * 10, proxyOps0 + proxyOps1);
    assertTrue(proxyOps0 >= 8, proxyOps0 + " operations: not distributed");
    assertTrue(proxyOps1 >= 8, proxyOps1 + " operations: not distributed");
  }

  private void testOverloaded(int expOverload) throws Exception {
    testOverloaded(expOverload, expOverload);
  }

  private void testOverloaded(int expOverloadMin, int expOverloadMax)
      throws Exception {
    RouterContext routerContext = cluster.getRandomRouter();
    URI address = routerContext.getFileSystemURI();
    Configuration conf = new HdfsConfiguration();
    testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10);
  }

  /**
   * Test if the Router gets overloaded by submitting requests in parallel.
   * We check how many requests got rejected at the end.
   * @param expOverloadMin Min number of requests expected as overloaded.
   * @param expOverloadMax Max number of requests expected as overloaded.
   * @param address Destination address.
   * @param conf Configuration of the client.
   * @param numOps Number of operations to submit.
   * @throws Exception If it cannot perform the test.
   */
  private void testOverloaded(int expOverloadMin, int expOverloadMax,
      final URI address, final Configuration conf, final int numOps)
          throws Exception {

    // Submit renewLease() ops which go to all subclusters
    final AtomicInteger overloadException = new AtomicInteger();
    ExecutorService exec = Executors.newFixedThreadPool(numOps);
    List<Future<?>> futures = new ArrayList<>();
    for (int i = 0; i < numOps; i++) {
      // Stagger the operations a little (50ms)
      final int sleepTime = i * 50;
      Future<?> future = exec.submit(() -> {
        DFSClient routerClient = null;
        try {
          Thread.sleep(sleepTime);
          routerClient = new DFSClient(address, conf);
          String clientName = routerClient.getClientName();
          ClientProtocol routerProto = routerClient.getNamenode();
          routerProto.renewLease(clientName, null);
        } catch (RemoteException re) {
          IOException ioe = re.unwrapRemoteException();
          assertTrue(ioe instanceof StandbyException, "Wrong exception: " + ioe);
          assertExceptionContains("is overloaded", ioe);
          overloadException.incrementAndGet();
        } catch (IOException e) {
          fail("Unexpected exception: " + e);
        } catch (InterruptedException e) {
          fail("Cannot sleep: " + e);
        } finally {
          if (routerClient != null) {
            try {
              routerClient.close();
            } catch (IOException e) {
              LOG.error("Cannot close the client");
            }
          }
        }
      });
      futures.add(future);
    }
    // Wait until all the requests are done
    while (!futures.isEmpty()) {
      futures.remove(0).get();
    }
    exec.shutdown();

    int num = overloadException.get();
    if (expOverloadMin == expOverloadMax) {
      assertEquals(expOverloadMin, num);
    } else {
      assertTrue(num >= expOverloadMin, "Expected >=" + expOverloadMin + " but was " + num);
      assertTrue(num <= expOverloadMax, "Expected <=" + expOverloadMax + " but was " + num);
    }
  }

  @Test
  public void testConnectionNullException() throws Exception {
    setupCluster(false, false);

    // Choose 1st router
    RouterContext routerContext = cluster.getRouters().get(0);
    Router router = routerContext.getRouter();
    // This router will throw ConnectionNullException
    simulateThrowExceptionRouterRpcServer(router.getRpcServer());

    // Set dfs.client.failover.random.order false, to pick 1st router at first
    Configuration conf = cluster.getRouterClientConf();
    conf.setBoolean("dfs.client.failover.random.order", false);
    // Client to access Router Cluster
    DFSClient routerClient =
        new DFSClient(new URI("hdfs://fed"), conf);

    // Get router0 metrics
    FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
        .getRouter().getRpcServer().getRPCMetrics();
    // Get router1 metrics
    FederationRPCMetrics rpcMetrics1 = cluster.getRouters().get(1)
        .getRouter().getRpcServer().getRPCMetrics();

    // Original failures
    long originalRouter0Failures = rpcMetrics0.getProxyOpFailureCommunicate();
    long originalRouter1Failures = rpcMetrics1.getProxyOpFailureCommunicate();

    // RPC call must be successful
    routerClient.getFileInfo("/");

    // Router 0 failures will increase
    assertEquals(originalRouter0Failures + 1,
        rpcMetrics0.getProxyOpFailureCommunicate());
    // Router 1 failures will not change
    assertEquals(originalRouter1Failures,
        rpcMetrics1.getProxyOpFailureCommunicate());
  }

  /**
   * When failover occurs, no namenodes are available within a short time.
   * Client will success after some retries.
   */
  @Test
  public void testNoNamenodesAvailable() throws Exception {
    setupCluster(false, true);

    transitionClusterNSToStandby(cluster);

    Configuration conf = cluster.getRouterClientConf();
    // Set dfs.client.failover.random.order false, to pick 1st router at first
    conf.setBoolean("dfs.client.failover.random.order", false);

    // Retries is 3 (see FailoverOnNetworkExceptionRetry#shouldRetry, will fail
    // when reties > max.attempts), so total access is 4.
    conf.setInt("dfs.client.retry.max.attempts", 2);
    DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);

    // Get router0 metrics
    FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
        .getRouter().getRpcServer().getRPCMetrics();
    // Get router1 metrics
    FederationRPCMetrics rpcMetrics1 = cluster.getRouters().get(1)
        .getRouter().getRpcServer().getRPCMetrics();

    // GetFileInfo will throw Exception
    String exceptionMessage = "org.apache.hadoop.hdfs.server.federation."
        + "router.NoNamenodesAvailableException: No namenodes available "
        + "under nameservice ns0";
    RemoteException remoteException = assertThrows(RemoteException.class, () -> {

      // Original failures
      long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
      long originalRouter1Failures = rpcMetrics1.getProxyOpNoNamenodes();

      routerClient.getFileInfo("/");

      // Router 0 failures will increase
      assertEquals(originalRouter0Failures + 4,
          rpcMetrics0.getProxyOpNoNamenodes());
      // Router 1 failures do not change
      assertEquals(originalRouter1Failures,
          rpcMetrics1.getProxyOpNoNamenodes());

      // Make name services available
      transitionClusterNSToActive(cluster, 0);
      for (RouterContext routerContext : cluster.getRouters()) {
        // Manually trigger the heartbeat
        Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
            .getRouter().getNamenodeHeartbeatServices();
        for (NamenodeHeartbeatService service : heartbeatServices) {
          service.periodicInvoke();
        }
        // Update service cache
        routerContext.getRouter().getStateStore().refreshCaches(true);
      }

      originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();

      // RPC call must be successful
      routerClient.getFileInfo("/");
      // Router 0 failures do not change
      assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes());
    });
    assertThat(remoteException.getMessage()).contains(exceptionMessage);
  }

  /**
   * When failover occurs, the router may record that the ns has no active namenode.
   * Only when the router updates the cache next time can the memory status be updated,
   * causing the router to report NoNamenodesAvailableException for a long time.
   */
  @Test
  public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception {
    setupCluster(false, true);
    transitionClusterNSToStandby(cluster);
    for (RouterContext routerContext : cluster.getRouters()) {
      // Manually trigger the heartbeat
      Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
              .getRouter().getNamenodeHeartbeatServices();
      for (NamenodeHeartbeatService service : heartbeatServices) {
        service.periodicInvoke();
      }
      // Update service cache
      routerContext.getRouter().getStateStore().refreshCaches(true);
    }
    // Record the time after the router first updated the cache
    long firstLoadTime = Time.now();
    List<MiniRouterDFSCluster.NamenodeContext> namenodes = cluster.getNamenodes();

    // Make sure all namenodes are in standby state
    for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) {
      assertEquals(STANDBY.ordinal(), namenodeContext.getNamenode().getNameNodeState());
    }

    Configuration conf = cluster.getRouterClientConf();
    // Set dfs.client.failover.random.order false, to pick 1st router at first
    conf.setBoolean("dfs.client.failover.random.order", false);

    DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);

    for (RouterContext routerContext : cluster.getRouters()) {
      // Get the second namenode in the router cache and make it active
      List<? extends FederationNamenodeContext> ns0 = routerContext.getRouter()
              .getNamenodeResolver()
              .getNamenodesForNameserviceId("ns0", false);

      String nsId = ns0.get(1).getNamenodeId();
      cluster.switchToActive("ns0", nsId);
      // Manually trigger the heartbeat, but the router does not manually load the cache
      Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
              .getRouter().getNamenodeHeartbeatServices();
      for (NamenodeHeartbeatService service : heartbeatServices) {
        service.periodicInvoke();
      }
      assertEquals(ACTIVE.ordinal(),
              cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState());
    }

    // Get router0 metrics
    FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
            .getRouter().getRpcServer().getRPCMetrics();
    // Original failures
    long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();

    /*
     * At this time, the router has recorded 2 standby namenodes in memory,
     * and the first accessed namenode is indeed standby,
     * then an NoNamenodesAvailableException will be reported for the first access,
     * and the next access will be successful.
     */
    routerClient.getFileInfo("/");
    long successReadTime = Time.now();
    assertEquals(originalRouter0Failures + 1, rpcMetrics0.getProxyOpNoNamenodes());

    /*
     * access the active namenode without waiting for the router to update the cache,
     * even if there are 2 standby states recorded in the router memory.
     */
    assertTrue(successReadTime - firstLoadTime < cluster.getCacheFlushInterval());
  }


  @Test
  public void testAsyncCallerPoolMetrics() throws Exception {
    setupCluster(true, false);
    simulateSlowNamenode(cluster.getCluster().getNameNode(0), 2);
    final ObjectMapper objectMapper = new ObjectMapper();

    // Set only one router to make test easier
    cluster.getRouters().remove(1);
    FederationRPCMetrics metrics = cluster.getRouters().get(0).getRouter()
        .getRpcServer().getRPCMetrics();

    // No active connection initially
    Map<String, Integer> result = objectMapper
        .readValue(metrics.getAsyncCallerPool(), Map.class);
    assertEquals(0, result.get("active").intValue());
    assertEquals(0, result.get("total").intValue());
    assertEquals(4, result.get("max").intValue());

    ExecutorService exec = Executors.newSingleThreadExecutor();

    try {
      // Run a client request to create an active connection
      exec.submit(() -> {
        DFSClient routerClient = null;
        try {
          routerClient = new DFSClient(new URI("hdfs://fed"),
              cluster.getRouterClientConf());
          String clientName = routerClient.getClientName();
          ClientProtocol routerProto = routerClient.getNamenode();
          routerProto.renewLease(clientName, null);
        } catch (Exception e) {
          fail("Client request failed: " + e);
        } finally {
          if (routerClient != null) {
            try {
              routerClient.close();
            } catch (IOException e) {
              LOG.error("Cannot close the client");
            }
          }
        }
      });

      // Wait for client request to be active
      GenericTestUtils.waitFor(() -> {
        try {
          Map<String, Integer> newResult = objectMapper.readValue(
              metrics.getAsyncCallerPool(), Map.class);
          if (newResult.get("active") != 1) {
            return false;
          }
          if (newResult.get("max") != 4) {
            return false;
          }
          int total = newResult.get("total");
          // "total" is dynamic
          return total >= 1 && total <= 4;
        } catch (Exception e) {
          LOG.error("Not able to parse metrics result: " + e);
        }
        return false;
      }, 100, 2000);
    } finally {
      exec.shutdown();
    }
  }
}