TestRouterAsyncRpcClient.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router.async;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.fs.permission.FsAction.ALL;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
 * Used to test the functionality of {@link RouterAsyncRpcClient}.
 */
public class TestRouterAsyncRpcClient {
  private static Configuration routerConf;
  /** Federated HDFS cluster. */
  private static MiniRouterDFSCluster cluster;
  private static String ns0;
  private static String ns1;

  /** Random Router for this federated cluster. */
  private MiniRouterDFSCluster.RouterContext router;
  private FileSystem routerFs;
  private RouterRpcServer routerRpcServer;
  private RouterAsyncRpcClient asyncRpcClient;
  private FederationRPCMetrics rpcMetrics;
  private final String testFile = "/test.file";

  /**
   * Start a cluster using a router service that includes 2 namespaces,
   * 6 namenodes and 6 datanodes.
   */
  @BeforeAll
  public static void setUpCluster() throws Exception {
    cluster = new MiniRouterDFSCluster(true, 2, 3,
        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
    cluster.setNumDatanodesPerNameservice(3);

    cluster.startCluster();

    // Making one Namenode active per nameservice
    if (cluster.isHighAvailability()) {
      for (String ns : cluster.getNameservices()) {
        cluster.switchToActive(ns, NAMENODES[0]);
        cluster.switchToStandby(ns, NAMENODES[1]);
        cluster.switchToObserver(ns, NAMENODES[2]);
      }
    }
    // Start routers with only an RPC service
    routerConf = new RouterConfigBuilder()
        .metrics()
        .rpc()
        .build();

    // Reduce the number of RPC clients threads to overload the Router easy
    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
    // We decrease the DN cache times to make the test faster
    routerConf.setTimeDuration(
        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
    cluster.addRouterOverrides(routerConf);
    // Start routers with only an RPC service
    cluster.startRouters();

    // Register and verify all NNs with all routers
    cluster.registerNamenodes();
    cluster.waitNamenodeRegistration();
    cluster.waitActiveNamespaces();
    ns0 = cluster.getNameservices().get(0);
    ns1 = cluster.getNameservices().get(1);
  }

  @AfterAll
  public static void shutdownCluster() {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  /**
   * Initialize the mount table, create a RouterAsyncRpcClient object, and create test file.
   */
  @BeforeEach
  public void setup() throws Exception {
    // Create mock locations
    installMockLocations();

    router = cluster.getRandomRouter();
    rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics();
    routerFs = router.getFileSystem();
    routerRpcServer = router.getRouterRpcServer();
    routerRpcServer.initAsyncThreadPools(routerConf);

    // Create a RouterAsyncRpcClient object
    asyncRpcClient = new RouterAsyncRpcClient(
        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
        routerRpcServer.getRPCMonitor(),
        routerRpcServer.getRouterStateIdContext());

    // Create a test file
    FSDataOutputStream fsDataOutputStream = routerFs.create(
        new Path(testFile), true);
    fsDataOutputStream.write(new byte[1024]);
    fsDataOutputStream.close();
  }

  @AfterEach
  public void down() throws IOException {
    // clear client context
    CallerContext.setCurrent(null);
    cluster.switchToActive(ns0, NAMENODES[0]);
    asyncRpcClient.getNamenodeResolver().updateActiveNamenode(
        ns0, NetUtils.createSocketAddr(cluster
            .getNamenode(ns0, NAMENODES[0]).getRpcAddress()));
    // Delete the test file
    boolean delete = routerFs.delete(new Path(testFile));
    assertTrue(delete);
    if (routerFs != null) {
      routerFs.close();
    }
  }

  /**
   * Test the functionality of the asynchronous invokeSingle method.
   */
  @Test
  public void testInvokeSingle() throws Exception {
    long proxyOps = rpcMetrics.getProxyOps();
    long activeProxyOps = rpcMetrics.getActiveProxyOps();
    RemoteMethod method =
        new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
    asyncRpcClient.invokeSingle(ns0, method);
    long id = syncReturn(Long.class);
    assertTrue(id > 0);
    assertEquals(proxyOps + 1, rpcMetrics.getProxyOps());
    assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps());
    assertTrue(rpcMetrics.getProcessingAvg() > 0);
    assertTrue(rpcMetrics.getProxyAvg() > 0);
  }

  /**
   * Test the functionality of the asynchronous invokeAll and invokeConcurrent methods.
   */
  @Test
  public void testInvokeAll() throws Exception {
    long proxyOps = rpcMetrics.getProxyOps();
    long activeProxyOps = rpcMetrics.getActiveProxyOps();
    final List<RemoteLocation> locations =
        routerRpcServer.getLocationsForPath("/multDes/dir", false);
    RemoteMethod method = new RemoteMethod("mkdirs",
        new Class<?>[] {String.class, FsPermission.class, boolean.class},
        new RemoteParam(), new FsPermission(ALL, ALL, ALL), false);
    asyncRpcClient.invokeAll(locations, method);
    LambdaTestUtils.intercept(FileNotFoundException.class,
        "Parent directory doesn't exist: /multDes",
        () -> syncReturn(boolean.class));
    assertEquals(proxyOps + 2, rpcMetrics.getProxyOps());
    assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps());

    proxyOps = rpcMetrics.getProxyOps();
    activeProxyOps = rpcMetrics.getActiveProxyOps();
    method = new RemoteMethod("mkdirs",
        new Class<?>[] {String.class, FsPermission.class, boolean.class},
        new RemoteParam(), new FsPermission(ALL, ALL, ALL), true);
    asyncRpcClient.invokeAll(locations, method);
    Boolean success = syncReturn(Boolean.class);
    assertTrue(success);
    assertEquals(proxyOps + 2, rpcMetrics.getProxyOps());
    assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps());

    FileStatus[] fileStatuses = routerFs.listStatus(new Path("/multDes"));
    assertNotNull(fileStatuses);
    assertTrue(rpcMetrics.getProcessingAvg() > 0);
    assertTrue(rpcMetrics.getProxyAvg() > 0);
  }

  /**
   * Test the functionality of the asynchronous invokeMethod method.
   */
  @Test
  public void testInvokeMethod() throws Exception {
    long proxyOps = rpcMetrics.getProxyOps();
    long activeProxyOps = rpcMetrics.getActiveProxyOps();
    RemoteMethod method = new RemoteMethod("getFileInfo",
        new Class<?>[] {String.class}, new RemoteParam());
    UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
    Class<?> protocol = method.getProtocol();
    Object[] params = new String[]{testFile};
    List<? extends FederationNamenodeContext> namenodes =
        asyncRpcClient.getOrderedNamenodes(ns0, false);
    asyncRpcClient.invokeMethod(ugi, namenodes, false,
        protocol, method.getMethod(), params);
    FileStatus fileStatus = syncReturn(FileStatus.class);
    assertEquals(1024, fileStatus.getLen());
    assertEquals(proxyOps + 1, rpcMetrics.getProxyOps());
    assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps());

    LambdaTestUtils.intercept(IOException.class,
        "No namenodes to invoke",
        () -> asyncRpcClient.invokeMethod(ugi, new ArrayList<>(), false,
            protocol, method.getMethod(), params));

    proxyOps = rpcMetrics.getProxyOps();
    activeProxyOps = rpcMetrics.getActiveProxyOps();
    asyncRpcClient.invokeMethod(ugi, namenodes.subList(1, 3), false,
        protocol, method.getMethod(), params);
    LambdaTestUtils.intercept(StandbyException.class,
        "No namenode available to invoke getFileInfo",
        () -> syncReturn(FileStatus.class));
    assertEquals(proxyOps, rpcMetrics.getProxyOps());
    assertEquals(activeProxyOps, rpcMetrics.getActiveProxyOps());

    cluster.switchToStandby(ns0, NAMENODES[0]);
    asyncRpcClient.getNamenodeResolver().updateUnavailableNamenode(
        ns0, NetUtils.createSocketAddr(namenodes.get(0).getRpcAddress()));
    asyncRpcClient.invokeMethod(ugi, namenodes, false,
        protocol, method.getMethod(), params);
    LambdaTestUtils.intercept(RetriableException.class,
        "No namenodes available under nameservice ns0",
        () -> syncReturn(FileStatus.class));
    assertEquals(1, rpcMetrics.getProxyOpNoNamenodes());

    asyncRpcClient.invokeMethod(ugi, namenodes, false,
        null, method.getMethod(), params);
    LambdaTestUtils.intercept(StandbyException.class,
        "Cannot get a connection",
        () -> syncReturn(FileStatus.class));
    assertEquals(1, rpcMetrics.getProxyOpFailureCommunicate());
  }

  /**
   * Test the functionality of the asynchronous invokeSequential method.
   */
  @Test
  public void testInvokeSequential() throws Exception {
    List<RemoteLocation> locations =
        routerRpcServer.getLocationsForPath(testFile, false, false);
    RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations",
        new Class<?>[] {String.class, long.class, long.class},
        new RemoteParam(), 0, 1024);
    asyncRpcClient.invokeSequential(locations, remoteMethod,
        LocatedBlocks.class, null);
    LocatedBlocks locatedBlocks = syncReturn(LocatedBlocks.class);
    assertEquals(1024, locatedBlocks.getFileLength());
    assertEquals(1, locatedBlocks.getLocatedBlocks().size());
  }

  /**
   * Initialize the mount information.
   */
  private void installMockLocations() {
    List<MiniRouterDFSCluster.RouterContext> routers = cluster.getRouters();

    for (MiniRouterDFSCluster.RouterContext rc : routers) {
      Router r = rc.getRouter();
      MockResolver resolver = (MockResolver) r.getSubclusterResolver();
      resolver.addLocation("/", ns0, "/");
      resolver.addLocation("/multDes", ns0, "/multDes");
      resolver.addLocation("/multDes", ns1, "/multDes");
    }
  }
}