RouterAsyncRpcClient.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router.async;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionContext;
import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdContext;
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;

import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncThrowException;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture;

/**
 * The {@code RouterAsyncRpcClient} class extends the functionality of the base
 * {@code RouterRpcClient} class to provide asynchronous remote procedure call (RPC)
 * capabilities for communication with the Hadoop Distributed File System (HDFS)
 * NameNodes in a federated environment.
 *
 * <p>This class is responsible for managing the asynchronous execution of RPCs to
 * multiple NameNodes, which can improve performance and scalability in large HDFS
 * deployments.
 *
 * <p>The class also includes methods for handling failover scenarios, where it can
 * automatically retry operations on alternative NameNodes if the primary NameNode is
 * unavailable or in standby mode.
 *
 * @see RouterRpcClient
 */
public class RouterAsyncRpcClient extends RouterRpcClient{
  private static final Logger LOG =
      LoggerFactory.getLogger(RouterAsyncRpcClient.class);
  /** Router using this RPC client. */
  private final Router router;
  /** Interface to identify the active NN for a nameservice or blockpool ID. */
  private final ActiveNamenodeResolver namenodeResolver;
  /** Optional perf monitor. */
  private final RouterRpcMonitor rpcMonitor;

  /**
   * Create a router async RPC client to manage remote procedure calls to NNs.
   *
   * @param conf Hdfs Configuration.
   * @param router A router using this RPC client.
   * @param resolver A NN resolver to determine the currently active NN in HA.
   * @param monitor Optional performance monitor.
   * @param routerStateIdContext the router state context object to hold the state ids for all
   * namespaces.
   */
  public RouterAsyncRpcClient(Configuration conf,
      Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
      RouterStateIdContext routerStateIdContext) {
    super(conf, router, resolver, monitor, routerStateIdContext);
    this.router = router;
    this.namenodeResolver = resolver;
    this.rpcMonitor = monitor;
  }

  @Override
  protected void initConcurrentCallExecutorService(Configuration conf) {
    // No need to initialize the thread pool for concurrent call.
  }

  /**
   * Invoke method in all locations and return success if any succeeds.
   *
   * @param <T> The type of the remote location.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @return If the call succeeds in any location.
   * @throws IOException If any of the calls return an exception.
   */
  @Override
  public <T extends RemoteLocationContext> boolean invokeAll(
      final Collection<T> locations, final RemoteMethod method)
      throws IOException {
    invokeConcurrent(locations, method, false, false,
        Boolean.class);
    asyncApply((ApplyFunction<Map<T, Boolean>, Object>)
        results -> results.containsValue(true));
    return asyncReturn(boolean.class);
  }

  /**
   * Invokes a method against the ClientProtocol proxy server. If a standby
   * exception is generated by the call to the client, retries using the
   * alternate server.
   * <p>
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param ugi User group information.
   * @param namenodes A prioritized list of namenodes within the same
   *                  nameservice.
   * @param useObserver Whether to use observer namenodes.
   * @param protocol the protocol of the connection.
   * @param method Remote ClientProtocol method to invoke.
   * @param params Variable list of parameters matching the method.
   * @return The result of invoking the method.
   * @throws ConnectException If it cannot connect to any Namenode.
   * @throws StandbyException If all Namenodes are in Standby.
   * @throws IOException If it cannot invoke the method.
   */
  @Override
  public Object invokeMethod(
      UserGroupInformation ugi,
      List<? extends FederationNamenodeContext> namenodes,
      boolean useObserver, Class<?> protocol,
      Method method, Object... params) throws IOException {
    if (namenodes == null || namenodes.isEmpty()) {
      throw new IOException("No namenodes to invoke " + method.getName() +
          " with params " + Arrays.deepToString(params) + " from "
          + router.getRouterId());
    }
    String nsid = namenodes.get(0).getNameserviceId();
    // transfer threadLocalContext to worker threads of executor.
    ThreadLocalContext threadLocalContext = new ThreadLocalContext();
    asyncComplete(null);
    asyncApplyUseExecutor((AsyncApplyFunction<Object, Object>) o -> {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver,
            namenodes.toString(), params);
      }
      threadLocalContext.transfer();
      RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
      acquirePermit(nsid, ugi, method.getName(), controller);
      invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
          useObserver, protocol, method, params);
      asyncFinally(object -> {
        releasePermit(nsid, ugi, method, controller);
        return object;
      });
    }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
        router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
    return null;
  }

  /**
   * Asynchronously invokes a method on the specified NameNodes for a given user and operation.
   * This method is responsible for the actual execution of the remote method call on the
   * NameNodes in a non-blocking manner, allowing for concurrent processing.
   *
   * <p>In case of exceptions, the method includes logic to handle retries, failover to standby
   * NameNodes, and proper exception handling to ensure that the calling code can respond
   * appropriately to different error conditions.
   *
   * @param ugi The user information under which the method is to be invoked.
   * @param namenodes The list of NameNode contexts on which the method will be invoked.
   * @param useObserver Whether to use an observer node for the invocation if available.
   * @param protocol The protocol class defining the method to be invoked.
   * @param method The method to be invoked on the NameNodes.
   * @param params The parameters for the method invocation.
   */
  private void invokeMethodAsync(
      final UserGroupInformation ugi,
      final List<FederationNamenodeContext> namenodes,
      boolean useObserver,
      final Class<?> protocol, final Method method, final Object... params) {

    addClientInfoToCallerContext(ugi);
    if (rpcMonitor != null) {
      rpcMonitor.proxyOp();
    }
    final ExecutionStatus status = new ExecutionStatus(false, useObserver);
    Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
    final ConnectionContext[] connection = new ConnectionContext[1];
    asyncForEach(namenodes.iterator(),
        (foreach, namenode) -> {
          if (!status.isShouldUseObserver()
              && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
            asyncComplete(null);
            return;
          }
          String nsId = namenode.getNameserviceId();
          String rpcAddress = namenode.getRpcAddress();
          asyncTry(() -> {
            connection[0] = getConnection(ugi, nsId, rpcAddress, protocol);
            NameNodeProxiesClient.ProxyAndInfo<?> client = connection[0].getClient();
            invoke(namenode, status.isShouldUseObserver(), 0, method,
                client.getProxy(), params);
            asyncApply(res -> {
              status.setComplete(true);
              postProcessResult(method, status, namenode, nsId, client);
              foreach.breakNow();
              return res;
            });
          });
          asyncCatch((res, ioe) -> {
            ioes.put(namenode, ioe);
            handleInvokeMethodIOException(namenode, ioe, status, useObserver);
            return res;
          }, IOException.class);
          asyncFinally(res -> {
            if (connection[0] != null) {
              connection[0].release();
            }
            return res;
          });
        });

    asyncApply(res -> {
      if (status.isComplete()) {
        return res;
      }
      return handlerAllNamenodeFail(namenodes, method, ioes, params);
    });
  }

  /**
   * Asynchronously invokes a method on a specified NameNode in the context of the given
   * namespace and NameNode information. This method is designed to handle the invocation
   * in a non-blocking manner, allowing for improved performance and scalability when
   * interacting with the NameNode.
   *
   * @param namenode The context information for the NameNode.
   * @param listObserverFirst Whether to list the observer node first in the invocation list.
   * @param retryCount The current retry count for the operation.
   * @param method The method to be invoked on the NameNode.
   * @param obj The proxy object through which the method will be invoked.
   * @param params The parameters for the method invocation.
   */
  protected Object invoke(
      FederationNamenodeContext namenode, Boolean listObserverFirst,
      int retryCount, final Method method,
      final Object obj, final Object... params) throws IOException {
    try {
      Client.setAsynchronousMode(true);
      method.invoke(obj, params);
      Client.setAsynchronousMode(false);
      asyncCatch((AsyncCatchFunction<Object, Throwable>) (o, e) -> {
        handlerInvokeException(namenode, listObserverFirst,
            retryCount, method, obj, e, params);
      }, Throwable.class);
    } catch (InvocationTargetException e) {
      asyncThrowException(e.getCause());
    } catch (IllegalAccessException | IllegalArgumentException e) {
      LOG.error("Unexpected exception while proxying API", e);
      asyncThrowException(e);
    }
    return null;
  }

  /**
   * Invokes sequential proxy calls to different locations. Continues to invoke
   * calls until the success condition is met, or until all locations have been
   * attempted.
   *
   * The success condition may be specified by:
   * <ul>
   * <li>An expected result class
   * <li>An expected result value
   * </ul>
   *
   * If no expected result class/values are specified, the success condition is
   * a call that does not throw a remote exception.
   *
   * @param <T> The type of the remote method return.
   * @param locations List of locations/nameservices to call concurrently.
   * @param remoteMethod The remote method and parameters to invoke.
   * @param expectedResultClass In order to be considered a positive result, the
   *          return type must be of this class.
   * @param expectedResultValue In order to be considered a positive result, the
   *          return value must equal the value of this object.
   * @return The result of the first successful call, or if no calls are
   *         successful, the result of the first RPC call executed.
   * @throws IOException if the success condition is not met, return the first
   *                     remote exception generated.
   */
  @Override
  public <T> T invokeSequential(
      final List<? extends RemoteLocationContext> locations,
      final RemoteMethod remoteMethod, Class<T> expectedResultClass,
      Object expectedResultValue) throws IOException {
    invokeSequential(remoteMethod, locations, expectedResultClass, expectedResultValue);
    asyncApply((ApplyFunction<RemoteResult, Object>) RemoteResult::getResult);
    return asyncReturn(expectedResultClass);
  }

  /**
   * Invokes sequential proxy calls to different locations. Continues to invoke
   * calls until the success condition is met, or until all locations have been
   * attempted.
   *
   * The success condition may be specified by:
   * <ul>
   * <li>An expected result class
   * <li>An expected result value
   * </ul>
   *
   * If no expected result class/values are specified, the success condition is
   * a call that does not throw a remote exception.
   *
   * This returns RemoteResult, which contains the invoked location as well
   * as the result.
   *
   * @param <R> The type of the remote location.
   * @param <T> The type of the remote method return.
   * @param remoteMethod The remote method and parameters to invoke.
   * @param locations List of locations/nameservices to call concurrently.
   * @param expectedResultClass In order to be considered a positive result, the
   *          return type must be of this class.
   * @param expectedResultValue In order to be considered a positive result, the
   *          return value must equal the value of this object.
   * @return The result of the first successful call, or if no calls are
   *         successful, the result of the first RPC call executed, along with
   *         the invoked location in form of RemoteResult.
   * @throws IOException if the success condition is not met, return the first
   *                     remote exception generated.
   */
  @Override
  public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
      final RemoteMethod remoteMethod, final List<R> locations,
      Class<T> expectedResultClass, Object expectedResultValue)
      throws IOException {

    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
    final Method m = remoteMethod.getMethod();
    List<IOException> thrownExceptions = new ArrayList<>();
    final Object[] firstResult = {null};
    final ExecutionStatus status = new ExecutionStatus();
    Iterator<RemoteLocationContext> locationIterator =
        (Iterator<RemoteLocationContext>) locations.iterator();
    // Invoke in priority order
    asyncForEach(locationIterator,
        (foreach, loc) -> {
          String ns = loc.getNameserviceId();
          boolean isObserverRead = isObserverReadEligible(ns, m);
          List<? extends FederationNamenodeContext> namenodes =
              getOrderedNamenodes(ns, isObserverRead);
          asyncTry(() -> {
            Class<?> proto = remoteMethod.getProtocol();
            Object[] params = remoteMethod.getParams(loc);
            invokeMethod(ugi, namenodes, isObserverRead, proto, m, params);
            asyncApply(result -> {
              // Check if the result is what we expected
              if (isExpectedClass(expectedResultClass, result) &&
                  isExpectedValue(expectedResultValue, result)) {
                // Valid result, stop here
                @SuppressWarnings("unchecked") R location = (R) loc;
                @SuppressWarnings("unchecked") T ret = (T) result;
                foreach.breakNow();
                status.setComplete(true);
                return new RemoteResult<>(location, ret);
              }
              if (firstResult[0] == null) {
                firstResult[0] = result;
              }
              return null;
            });
          });
          asyncCatch((ret, e) -> {
            if (e instanceof IOException) {
              IOException ioe = (IOException) e;
              // Localize the exception
              ioe = processException(ioe, loc);
              // Record it and move on
              thrownExceptions.add(ioe);
            } else {
              // Unusual error, ClientProtocol calls always use IOException (or
              // RemoteException). Re-wrap in IOException for compatibility with
              // ClientProtocol.
              LOG.error("Unexpected exception {} proxying {} to {}",
                  e.getClass(), m.getName(), ns, e);
              IOException ioe = new IOException(
                  "Unexpected exception proxying API " + e.getMessage(), e);
              thrownExceptions.add(ioe);
            }
            return ret;
          }, Exception.class);
        });
    asyncApply(result -> {
      if (status.isComplete()) {
        return result;
      }
      if (!thrownExceptions.isEmpty()) {
        // An unavailable subcluster may be the actual cause
        // We cannot surface other exceptions (e.g., FileNotFoundException)
        for (int i = 0; i < thrownExceptions.size(); i++) {
          IOException ioe = thrownExceptions.get(i);
          if (isUnavailableException(ioe)) {
            throw ioe;
          }
        }
        // re-throw the first exception thrown for compatibility
        throw thrownExceptions.get(0);
      }
      // Return the first result, whether it is the value or not
      @SuppressWarnings("unchecked") T ret = (T) firstResult[0];
      return new RemoteResult<>(locations.get(0), ret);
    });
    return asyncReturn(RemoteResult.class);
  }

  /**
   * Invokes multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   * <p>
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @param requireResponse If true an exception will be thrown if all calls do
   *          not complete. If false exceptions are ignored and all data results
   *          successfully received are returned.
   * @param standby If the requests should go to the standby namenodes too.
   * @param timeOutMs Timeout for each individual call.
   * @param clazz Type of the remote return type.
   * @return Result of invoking the method per subcluster: nsId to result.
   * @throws IOException If requiredResponse=true and any of the calls throw an
   *           exception.
   */
  @Override
  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
      final Collection<T> locations, final RemoteMethod method,
      boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
      throws IOException {
    invokeConcurrent(locations, method, standby, timeOutMs, clazz);
    asyncApply((ApplyFunction<List<RemoteResult<T, R>>, Object>)
        results -> postProcessResult(requireResponse, results));
    return asyncReturn(Map.class);
  }

  /**
   * Invokes multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param method The remote method and parameters to invoke.
   * @param timeOutMs Timeout for each individual call.
   * @param controller Fairness manager to control handlers assigned per NS.
   * @param orderedLocations List of remote locations to call concurrently.
   * @param callables Invoke method for each NameNode.
   * @return Result of invoking the method per subcluster (list of results),
   *         This includes the exception for each remote location.
   * @throws IOException If there are errors invoking the method.
   */
  @Override
  protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(
      RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
      List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
    final Method m = method.getMethod();
    final CompletableFuture<Object>[] futures =
        new CompletableFuture[callables.size()];
    int i = 0;
    for (Callable<Object> callable : callables) {
      CompletableFuture<Object> future = null;
      try {
        callable.call();
        future = getCompletableFuture();
      } catch (Exception e) {
        future = new CompletableFuture<>();
        future.completeExceptionally(warpCompletionException(e));
      }
      futures[i++] = future;
    }

    asyncCompleteWith(CompletableFuture.allOf(futures)
        .handle((unused, throwable) -> {
          try {
            return processFutures(method, m, orderedLocations, Arrays.asList(futures));
          } catch (InterruptedException e) {
            LOG.error("Unexpected error while invoking API: {}", e.getMessage());
            throw warpCompletionException(new IOException(
                "Unexpected error while invoking API " + e.getMessage(), e));
          }
        }));
    return asyncReturn(List.class);
  }

  /**
   * Invokes a ClientProtocol method against the specified namespace.
   * <p>
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param location RemoteLocation to invoke.
   * @param method The remote method and parameters to invoke.
   * @return Result of invoking the method per subcluster (list of results),
   *         This includes the exception for each remote location.
   * @throws IOException If there are errors invoking the method.
   */
  @Override
  public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingle(
      T location, RemoteMethod method) throws IOException {
    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
    final Method m = method.getMethod();
    String ns = location.getNameserviceId();
    boolean isObserverRead = isObserverReadEligible(ns, m);
    final List<? extends FederationNamenodeContext> namenodes =
        getOrderedNamenodes(ns, isObserverRead);
    asyncTry(() -> {
      Class<?> proto = method.getProtocol();
      Object[] paramList = method.getParams(location);
      invokeMethod(ugi, namenodes, isObserverRead, proto, m, paramList);
      asyncApply((ApplyFunction<R, Object>) result -> {
        RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
        return Collections.singletonList(remoteResult);
      });
    });
    asyncCatch((o, ioe) -> {
      throw processException(ioe, location);
    }, IOException.class);
    return asyncReturn(List.class);
  }

  /**
   * Invokes a ClientProtocol method against the specified namespace.
   * <p>
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param nsId Target namespace for the method.
   * @param method The remote method and parameters to invoke.
   * @return The result of invoking the method.
   * @throws IOException If the invoke generated an error.
   */
  @Override
  public Object invokeSingle(final String nsId, RemoteMethod method)
      throws IOException {
    UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
    boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
    List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
    RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
    Class<?> proto = method.getProtocol();
    Method m = method.getMethod();
    Object[] params = method.getParams(loc);
    invokeMethod(ugi, nns, isObserverRead, proto, m, params);
    return null;
  }

  /**
   * Invokes a single proxy call for a single location.
   * <p>
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param location RemoteLocation to invoke.
   * @param remoteMethod The remote method and parameters to invoke.
   * @param clazz Class for the return type.
   * @param <T> The type of the remote method return.
   * @return The result of invoking the method if successful.
   * @throws IOException If the invoke generated an error.
   */
  public <T> T invokeSingle(
      final RemoteLocationContext location,
      RemoteMethod remoteMethod, Class<T> clazz) throws IOException {
    List<RemoteLocationContext> locations = Collections.singletonList(location);
    invokeSequential(locations, remoteMethod);
    return asyncReturn(clazz);
  }

  /**
   * Release permit for specific nsId after processing against downstream
   * nsId is completed.
   *  @param nsId Identifier of the block pool.
   * @param ugi UserGroupIdentifier associated with the user.
   * @param m Remote method that needs to be invoked.
   * @param controller fairness policy controller to release permit from
   */
  protected void releasePermit(final String nsId, final UserGroupInformation ugi,
      final Method m, RouterRpcFairnessPolicyController controller) {
    if (controller != null) {
      controller.releasePermit(nsId);
      LOG.trace("Permit released for ugi: {} for method: {}", ugi,
          m.getName());
    }
  }
}