TestRouterRefreshFairnessPolicyController.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.federation.fairness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.test.GenericTestUtils;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestRouterRefreshFairnessPolicyController {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class);
  private final GenericTestUtils.LogCapturer controllerLog =
      GenericTestUtils.LogCapturer.captureLogs(AbstractRouterRpcFairnessPolicyController.LOG);

  private StateStoreDFSCluster cluster;

  @BeforeAll
  public static void setLogLevel() {
    GenericTestUtils.setLogLevel(AbstractRouterRpcFairnessPolicyController.LOG, Level.DEBUG);
  }

  @AfterEach
  public void cleanup() {
    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }
  }

  @BeforeEach
  public void setupCluster() throws Exception {
    cluster = new StateStoreDFSCluster(false, 2);
    Configuration conf = new RouterConfigBuilder().stateStore().rpc().build();

    // Handlers concurrent:ns0 = 3:3
    conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
        StaticRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
    conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 9);
    // Allow metrics
    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, true);

    // Datanodes not needed for this test.
    cluster.setNumDatanodesPerNameservice(0);

    cluster.addRouterOverrides(conf);
    cluster.startCluster();
    cluster.startRouters();
    cluster.waitClusterUp();
  }

  @Test
  public void testRefreshNonexistentHandlerClass() {
    MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
    routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
        "org.apache.hadoop.hdfs.server.federation.fairness.ThisControllerDoesNotExist");
    assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
        routerContext.getRouterRpcClient()
            .refreshFairnessPolicyController(routerContext.getConf()));
  }

  @Test
  public void testRefreshClassDoesNotImplementControllerInterface() {
    MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
    routerContext.getConf()
        .set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, "java.lang.String");
    assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
        routerContext.getRouterRpcClient()
            .refreshFairnessPolicyController(routerContext.getConf()));
  }

  @Test
  public void testRefreshSuccessful() {
    MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();

    routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
        StaticRouterRpcFairnessPolicyController.class.getCanonicalName());
    assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
        routerContext.getRouterRpcClient()
            .refreshFairnessPolicyController(routerContext.getConf()));

    routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
        NoRouterRpcFairnessPolicyController.class.getCanonicalName());
    assertEquals(NoRouterRpcFairnessPolicyController.class.getCanonicalName(),
        routerContext.getRouterRpcClient()
            .refreshFairnessPolicyController(routerContext.getConf()));
  }

  @Test
  public void testConcurrentRefreshRequests() throws InterruptedException {
    MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
    RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
    controllerLog.clearOutput();

    // Spawn 100 concurrent refresh requests
    Thread[] threads = new Thread[100];
    for (int i = 0; i < 100; i++) {
      threads[i] = new Thread(() ->
          client.refreshFairnessPolicyController(routerContext.getConf()));
    }

    for (Thread thread : threads) {
      thread.start();
    }

    for (Thread thread : threads) {
      thread.join();
    }

    // There should be 100 controller shutdowns. All controllers created should be shut down.
    assertEquals(100, StringUtils.countMatches(controllerLog.getOutput(),
        "Shutting down router fairness policy controller"));
    controllerLog.clearOutput();
  }

  @Test
  public void testRefreshStaticChangeHandlers() throws Exception {
    // Setup and mock
    MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
    RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
    final long sleepTime = 3000;
    Mockito.doAnswer(invocationOnMock -> {
      Thread.sleep(sleepTime);
      return null;
    }).when(client)
        .invokeMethod(Mockito.any(), Mockito.any(), Mockito.anyBoolean(),
            Mockito.any(), Mockito.any(), Mockito.any());

    // No calls yet
    assertEquals("{}",
        routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
    List<Thread> preRefreshInvocations = makeDummyInvocations(client, 4, "ns0");

    Thread.sleep(2000);
    // 3 permits acquired, calls will take 3s to finish and release permits
    // 1 invocation rejected
    assertEquals("{\"ns0\":3}",
        routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
    assertEquals("{\"ns0\":1}",
        routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs());

    Configuration conf = routerContext.getConf();
    final int newNs0Permits = 2;
    final int newNs1Permits = 4;
    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits);
    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", newNs1Permits);
    Thread threadRefreshController = new Thread(() -> client.
        refreshFairnessPolicyController(routerContext.getConf()));
    threadRefreshController.start();
    threadRefreshController.join();

    // Wait for all dummy invocation threads to finish
    for (Thread thread : preRefreshInvocations) {
      thread.join();
    }

    // Controller should now have 2:4 handlers for ns0:ns1
    // Make 4 calls to ns0 and 6 calls to ns1 so that each will fail twice
    StaticRouterRpcFairnessPolicyController controller =
        (StaticRouterRpcFairnessPolicyController) client.getRouterRpcFairnessPolicyController();
    System.out.println(controller.getAvailableHandlerOnPerNs());
    List<Thread> ns0Invocations = makeDummyInvocations(client, newNs0Permits + 2, "ns0");
    List<Thread> ns1Invocations = makeDummyInvocations(client, newNs1Permits + 2, "ns1");

    // Wait for these threads to finish
    for (Thread thread : ns0Invocations) {
      thread.join();
    }
    for (Thread thread : ns1Invocations) {
      thread.join();
    }
    assertEquals("{\"ns0\":5,\"ns1\":4}",
        routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
    assertEquals("{\"ns0\":3,\"ns1\":2}",
        routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs());
  }

  private List<Thread> makeDummyInvocations(RouterRpcClient client, final int nThreads,
      final String namespace) {
    RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class);
    List<Thread> threadAcquirePermits = new ArrayList<>();
    for (int i = 0; i < nThreads; i++) {
      Thread threadAcquirePermit = new Thread(() -> {
        try {
          client.invokeSingle(namespace, dummyMethod);
        } catch (IOException e) {
          e.printStackTrace();
        }
      });
      threadAcquirePermits.add(threadAcquirePermit);
      threadAcquirePermit.start();
    }
    return threadAcquirePermits;
  }
}