RouterAsyncClientProtocol.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router.async;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;

/**
 * Module that implements all the async RPC calls in {@link ClientProtocol} in the
 * {@link RouterRpcServer}.
 */
public class RouterAsyncClientProtocol extends RouterClientProtocol {
  private static final Logger LOG =
      LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName());

  private final RouterRpcServer rpcServer;
  private final RouterRpcClient rpcClient;
  private final RouterFederationRename rbfRename;
  private final FileSubclusterResolver subclusterResolver;
  private final ActiveNamenodeResolver namenodeResolver;
  /** If it requires response from all subclusters. */
  private final boolean allowPartialList;
  /** Time out when getting the mount statistics. */
  private long mountStatusTimeOut;
  /** Default nameservice enabled. */
  private final boolean defaultNameServiceEnabled;
  /** Identifier for the super user. */
  private String superUser;
  /** Identifier for the super group. */
  private final String superGroup;
  /**
   * Caching server defaults so as to prevent redundant calls to namenode,
   * similar to DFSClient, caching saves efforts when router connects
   * to multiple clients.
   */
  private volatile FsServerDefaults serverDefaults;

  public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
    super(conf, rpcServer);
    this.rpcServer = rpcServer;
    this.rpcClient = rpcServer.getRPCClient();
    this.rbfRename = getRbfRename();
    this.subclusterResolver = getSubclusterResolver();
    this.namenodeResolver = getNamenodeResolver();
    this.allowPartialList = isAllowPartialList();
    this.mountStatusTimeOut = getMountStatusTimeOut();
    this.superUser = getSuperUser();
    this.superGroup = getSuperGroup();
    this.defaultNameServiceEnabled = conf.getBoolean(
        RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
        RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
  }

  @Override
  public FsServerDefaults getServerDefaults() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);
    long now = Time.monotonicNow();
    if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate()
        > getServerDefaultsValidityPeriod())) {
      RemoteMethod method = new RemoteMethod("getServerDefaults");
      rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class);
      asyncApply(o -> {
        serverDefaults = (FsServerDefaults) o;
        setServerDefaultsLastUpdate(now);
        return serverDefaults;
      });
    } else {
      asyncComplete(serverDefaults);
    }
    return asyncReturn(FsServerDefaults.class);
  }

  @Override
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
      String storagePolicy) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    if (createParent && rpcServer.isPathAll(src)) {
      int index = src.lastIndexOf(Path.SEPARATOR);
      String parent = src.substring(0, index);
      LOG.debug("Creating {} requires creating parent {}", src, parent);
      FsPermission parentPermissions = getParentPermission(masked);
      mkdirs(parent, parentPermissions, createParent);
      asyncApply((ApplyFunction<Boolean, Boolean>) success -> {
        if (!success) {
          // This shouldn't happen as mkdirs returns true or exception
          LOG.error("Couldn't create parents for {}", src);
        }
        return success;
      });
    }

    RemoteMethod method = new RemoteMethod("create",
        new Class<?>[] {String.class, FsPermission.class, String.class,
            EnumSetWritable.class, boolean.class, short.class,
            long.class, CryptoProtocolVersion[].class,
            String.class, String.class},
        new RemoteParam(), masked, clientName, flag, createParent,
        replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
    final List<RemoteLocation> locations =
        rpcServer.getLocationsForPath(src, true);
    final RemoteLocation[] createLocation = new RemoteLocation[1];
    asyncTry(() -> {
      rpcServer.getCreateLocationAsync(src, locations);
      asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation -> {
        createLocation[0] = remoteLocation;
        rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class);
        asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> {
          status.setNamespace(remoteLocation.getNameserviceId());
          return status;
        });
      });
    });
    asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
      final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
          method, src, ioe, createLocation[0], locations);
      rpcClient.invokeSequential(
          newLocations, method, HdfsFileStatus.class, null);
    }, IOException.class);

    return asyncReturn(HdfsFileStatus.class);
  }

  @Override
  public LastBlockWithStatus append(
      String src, String clientName,
      EnumSetWritable<CreateFlag> flag) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
    RemoteMethod method = new RemoteMethod("append",
        new Class<?>[] {String.class, String.class, EnumSetWritable.class},
        new RemoteParam(), clientName, flag);
    rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class, null);
    asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> {
      LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
      lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
      return lbws;
    });
    return asyncReturn(LastBlockWithStatus.class);
  }

  @Deprecated
  @Override
  public boolean rename(final String src, final String dst)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    final List<RemoteLocation> srcLocations =
        rpcServer.getLocationsForPath(src, true, false);
    final List<RemoteLocation> dstLocations =
        rpcServer.getLocationsForPath(dst, false, false);
    // srcLocations may be trimmed by getRenameDestinations()
    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
    if (locs.isEmpty()) {
      asyncComplete(
          rbfRename.routerFedRename(src, dst, srcLocations, dstLocations));
      return asyncReturn(Boolean.class);
    }
    RemoteMethod method = new RemoteMethod("rename",
        new Class<?>[] {String.class, String.class},
        new RemoteParam(), dstParam);
    isMultiDestDirectory(src);
    asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
      if (isMultiDestDirectory) {
        if (locs.size() != srcLocations.size()) {
          throw new IOException("Rename of " + src + " to " + dst + " is not"
              + " allowed. The number of remote locations for both source and"
              + " target should be same.");
        }
        rpcClient.invokeAll(locs, method);
      } else {
        rpcClient.invokeSequential(locs, method, Boolean.class,
            Boolean.TRUE);
      }
    });
    return asyncReturn(Boolean.class);
  }

  @Override
  public void rename2(
      final String src, final String dst,
      final Options.Rename... options) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    final List<RemoteLocation> srcLocations =
        rpcServer.getLocationsForPath(src, true, false);
    final List<RemoteLocation> dstLocations =
        rpcServer.getLocationsForPath(dst, false, false);
    // srcLocations may be trimmed by getRenameDestinations()
    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
    if (locs.isEmpty()) {
      rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
      return;
    }
    RemoteMethod method = new RemoteMethod("rename2",
        new Class<?>[] {String.class, String.class, options.getClass()},
        new RemoteParam(), dstParam, options);
    isMultiDestDirectory(src);
    asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
      if (isMultiDestDirectory) {
        if (locs.size() != srcLocations.size()) {
          throw new IOException("Rename of " + src + " to " + dst + " is not"
              + " allowed. The number of remote locations for both source and"
              + " target should be same.");
        }
        rpcClient.invokeConcurrent(locs, method);
      } else {
        rpcClient.invokeSequential(locs, method, null, null);
      }
    });
  }

  @Override
  public void concat(String trg, String[] src) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    // Concat only effects when all files in the same namespace.
    getFileRemoteLocation(trg);
    asyncApply((AsyncApplyFunction<RemoteLocation, Object>) targetDestination -> {
      if (targetDestination == null) {
        throw new IOException("Cannot find target file - " + trg);
      }
      String targetNameService = targetDestination.getNameserviceId();
      String[] sourceDestinations = new String[src.length];
      int[] index = new int[1];
      asyncForEach(Arrays.stream(src).iterator(), (forEachRun, sourceFile) -> {
        getFileRemoteLocation(sourceFile);
        asyncApply((ApplyFunction<RemoteLocation, Object>) srcLocation -> {
          if (srcLocation == null) {
            throw new IOException("Cannot find source file - " + sourceFile);
          }
          sourceDestinations[index[0]++] = srcLocation.getDest();
          if (!targetNameService.equals(srcLocation.getNameserviceId())) {
            throw new IOException("Cannot concatenate source file " + sourceFile
                + " because it is located in a different namespace" + " with nameservice "
                + srcLocation.getNameserviceId() + " from the target file with nameservice "
                + targetNameService);
          }
          return null;
        });
      });
      asyncApply((AsyncApplyFunction<Object, Object>) o -> {
        // Invoke
        RemoteMethod method = new RemoteMethod("concat",
            new Class<?>[] {String.class, String[].class},
            targetDestination.getDest(), sourceDestinations);
        rpcClient.invokeSingle(targetDestination, method, Void.class);
      });
    });
  }

  @Override
  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    final List<RemoteLocation> locations =
        rpcServer.getLocationsForPath(src, false);
    RemoteMethod method = new RemoteMethod("mkdirs",
        new Class<?>[] {String.class, FsPermission.class, boolean.class},
        new RemoteParam(), masked, createParent);

    // Create in all locations
    if (rpcServer.isPathAll(src)) {
      return rpcClient.invokeAll(locations, method);
    }

    if (locations.size() > 1) {
      // Check if this directory already exists
      asyncTry(() -> {
        getFileInfo(src);
        asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> {
          if (fileStatus != null) {
            // When existing, the NN doesn't return an exception; return true
            return true;
          }
          return false;
        });
      });
      asyncCatch((ret, ioe) -> {
        // Can't query if this file exists or not.
        LOG.error("Error getting file info for {} while proxying mkdirs: {}",
            src, ioe.getMessage());
        return false;
      }, IOException.class);

      asyncApply((AsyncApplyFunction<Boolean, Boolean>)ret -> {
        if (!ret) {
          final RemoteLocation firstLocation = locations.get(0);
          asyncTry(() -> {
            rpcClient.invokeSingle(firstLocation, method, Boolean.class);
          });
          asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
            final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
                method, src, ioe, firstLocation, locations);
            rpcClient.invokeSequential(
                newLocations, method, Boolean.class, Boolean.TRUE);
          }, IOException.class);
        } else {
          asyncComplete(ret);
        }
      });
    } else {
      final RemoteLocation firstLocation = locations.get(0);
      asyncTry(() -> {
        rpcClient.invokeSingle(firstLocation, method, Boolean.class);
      });
      asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
        final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
            method, src, ioe, firstLocation, locations);
        rpcClient.invokeSequential(
            newLocations, method, Boolean.class, Boolean.TRUE);
      }, IOException.class);
    }
    return asyncReturn(Boolean.class);
  }

  @Override
  public DirectoryListing getListing(
      String src, byte[] startAfter, boolean needLocation) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);
    GetListingComparator comparator = RouterClientProtocol.getComparator();
    getListingInt(src, startAfter, needLocation);
    asyncApply((AsyncApplyFunction<List<RemoteResult<RemoteLocation, DirectoryListing>>, Object>)
        listings -> {
        TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator);
        int totalRemainingEntries = 0;
        final int[] remainingEntries = {0};
        boolean namenodeListingExists = false;
        // Check the subcluster listing with the smallest name to make sure
        // no file is skipped across subclusters
        byte[] lastName = null;
        if (listings != null) {
          for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
            if (result.hasException()) {
              IOException ioe = result.getException();
              if (ioe instanceof FileNotFoundException) {
                RemoteLocation location = result.getLocation();
                LOG.debug("Cannot get listing from {}", location);
              } else if (!allowPartialList) {
                throw ioe;
              }
            } else if (result.getResult() != null) {
              DirectoryListing listing = result.getResult();
              totalRemainingEntries += listing.getRemainingEntries();
              HdfsFileStatus[] partialListing = listing.getPartialListing();
              int length = partialListing.length;
              if (length > 0) {
                HdfsFileStatus lastLocalEntry = partialListing[length-1];
                byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes();
                if (lastName == null ||
                    comparator.compare(lastName, lastLocalName) > 0) {
                  lastName = lastLocalName;
                }
              }
            }
          }

          // Add existing entries
          for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
            DirectoryListing listing = result.getResult();
            if (listing != null) {
              namenodeListingExists = true;
              for (HdfsFileStatus file : listing.getPartialListing()) {
                byte[] filename = file.getLocalNameInBytes();
                if (totalRemainingEntries > 0 &&
                    comparator.compare(filename, lastName) > 0) {
                  // Discarding entries further than the lastName
                  remainingEntries[0]++;
                } else {
                  nnListing.put(filename, file);
                }
              }
              remainingEntries[0] += listing.getRemainingEntries();
            }
          }
        }

        // Add mount points at this level in the tree
        final List<String> children = subclusterResolver.getMountPoints(src);
        if (children != null) {
          // Get the dates for each mount point
          Map<String, Long> dates = getMountPointDates(src);
          byte[] finalLastName = lastName;
          asyncForEach(children.iterator(), (forEachRun, child) -> {
            long date = 0;
            if (dates != null && dates.containsKey(child)) {
              date = dates.get(child);
            }
            Path childPath = new Path(src, child);
            getMountPointStatus(childPath.toString(), 0, date);
            asyncApply((ApplyFunction<HdfsFileStatus, Object>) dirStatus -> {
              // if there is no subcluster path, always add mount point
              byte[] bChild = DFSUtil.string2Bytes(child);
              if (finalLastName == null) {
                nnListing.put(bChild, dirStatus);
              } else {
                if (shouldAddMountPoint(bChild,
                    finalLastName, startAfter, remainingEntries[0])) {
                  // This may overwrite existing listing entries with the mount point
                  // TODO don't add if already there?
                  nnListing.put(bChild, dirStatus);
                }
              }
              return null;
            });
          });
          boolean finalNamenodeListingExists = namenodeListingExists;
          asyncApply(o -> {
            // Update the remaining count to include left mount points
            if (nnListing.size() > 0) {
              byte[] lastListing = nnListing.lastKey();
              for (int i = 0; i < children.size(); i++) {
                byte[] bChild = DFSUtil.string2Bytes(children.get(i));
                if (comparator.compare(bChild, lastListing) > 0) {
                  remainingEntries[0] += (children.size() - i);
                  break;
                }
              }
            }
            return finalNamenodeListingExists;
          });
        } else {
          asyncComplete(namenodeListingExists);
        }

        asyncApply((ApplyFunction<Boolean, Object>) exists -> {
          if (!exists && nnListing.size() == 0 && children == null) {
            // NN returns a null object if the directory cannot be found and has no
            // listing. If we didn't retrieve any NN listing data, and there are no
            // mount points here, return null.
            return null;
          }

          // Generate combined listing
          HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
          combinedData = nnListing.values().toArray(combinedData);
          return new DirectoryListing(combinedData, remainingEntries[0]);
        });
      });
    return asyncReturn(DirectoryListing.class);
  }

  /**
   * Get listing on remote locations.
   */
  @Override
  protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
      String src, byte[] startAfter, boolean needLocation) throws IOException {
    try {
      List<RemoteLocation> locations =
          rpcServer.getLocationsForPath(src, false, false);
      // Locate the dir and fetch the listing.
      if (locations.isEmpty()) {
        asyncComplete(new ArrayList<>());
        return asyncReturn(List.class);
      }
      RemoteMethod method = new RemoteMethod("getListing",
          new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
          new RemoteParam(), startAfter, needLocation);
      rpcClient.invokeConcurrent(locations, method, false, -1,
          DirectoryListing.class);
    } catch (NoLocationException | RouterResolveException e) {
      LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage());
      asyncComplete(new ArrayList<>());
    }

    return asyncReturn(List.class);
  }

  @Override
  public HdfsFileStatus getFileInfo(String src) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    final IOException[] noLocationException = new IOException[1];
    asyncTry(() -> {
      final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false, false);
      RemoteMethod method = new RemoteMethod("getFileInfo",
          new Class<?>[] {String.class}, new RemoteParam());
      // If it's a directory, we check in all locations
      if (rpcServer.isPathAll(src)) {
        getFileInfoAll(locations, method);
      } else {
        // Check for file information sequentially
        rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null);
      }
    });
    asyncCatch((o, e) -> {
      if (e instanceof NoLocationException
          || e instanceof RouterResolveException) {
        noLocationException[0] = e;
      } else {
        throw e;
      }
      return null;
    }, IOException.class);

    asyncApply((AsyncApplyFunction<HdfsFileStatus, Object>) ret -> {
      // If there is no real path, check mount points
      if (ret == null) {
        List<String> children = subclusterResolver.getMountPoints(src);
        if (children != null && !children.isEmpty()) {
          Map<String, Long> dates = getMountPointDates(src);
          long date = 0;
          if (dates != null && dates.containsKey(src)) {
            date = dates.get(src);
          }
          getMountPointStatus(src, children.size(), date, false);
        } else if (children != null) {
          // The src is a mount point, but there are no files or directories
          getMountPointStatus(src, 0, 0, false);
        } else {
          if (noLocationException[0] != null) {
            throw noLocationException[0];
          }
          asyncComplete(null);
          return;
        }
        asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) result -> {
          // Can't find mount point for path and the path didn't contain any sub monit points,
          // throw the NoLocationException to client.
          if (result == null && noLocationException[0] != null) {
            throw noLocationException[0];
          }
          return result;
        });
      } else {
        asyncComplete(ret);
      }
    });

    return asyncReturn(HdfsFileStatus.class);
  }

  @Override
  public RemoteLocation getFileRemoteLocation(String path) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
    if (locations.size() == 1) {
      asyncComplete(locations.get(0));
      return asyncReturn(RemoteLocation.class);
    }

    asyncForEach(locations.iterator(), (forEachRun, location) -> {
      RemoteMethod method =
          new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam());
      rpcClient.invokeSequential(Collections.singletonList(location), method,
          HdfsFileStatus.class, null);
      asyncApply((ApplyFunction<HdfsFileStatus, RemoteLocation>) ret -> {
        if (ret != null) {
          forEachRun.breakNow();
          return location;
        }
        return null;
      });
    });

    return asyncReturn(RemoteLocation.class);
  }

  @Override
  public HdfsFileStatus getMountPointStatus(
      String name, int childrenNum, long date, boolean setPath) {
    long modTime = date;
    long accessTime = date;
    final FsPermission[] permission = new FsPermission[]{FsPermission.getDirDefault()};
    final String[] owner = new String[]{this.superUser};
    final String[] group = new String[]{this.superGroup};
    final int[] childrenNums = new int[]{childrenNum};
    final EnumSet<HdfsFileStatus.Flags>[] flags =
        new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)};
    long inodeId = 0;
    HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
    if (setPath) {
      Path path = new Path(name);
      String nameStr = path.getName();
      builder.path(DFSUtil.string2Bytes(nameStr));
    }

    if (getSubclusterResolver() instanceof MountTableResolver) {
      asyncTry(() -> {
        String mName = name.startsWith("/") ? name : "/" + name;
        MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
        MountTable entry = mountTable.getMountPoint(mName);
        if (entry != null) {
          permission[0] = entry.getMode();
          owner[0] = entry.getOwnerName();
          group[0] = entry.getGroupName();

          RemoteMethod method = new RemoteMethod("getFileInfo",
              new Class<?>[] {String.class}, new RemoteParam());
          getFileInfoAll(
              entry.getDestinations(), method, mountStatusTimeOut);
          asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) fInfo -> {
            if (fInfo != null) {
              permission[0] = fInfo.getPermission();
              owner[0] = fInfo.getOwner();
              group[0] = fInfo.getGroup();
              childrenNums[0] = fInfo.getChildrenNum();
              flags[0] = DFSUtil
                  .getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(),
                      fInfo.isSnapshotEnabled(), fInfo.hasAcl());
            }
            return builder.isdir(true)
                .mtime(modTime)
                .atime(accessTime)
                .perm(permission[0])
                .owner(owner[0])
                .group(group[0])
                .symlink(new byte[0])
                .fileId(inodeId)
                .children(childrenNums[0])
                .flags(flags[0])
                .build();
          });
        } else {
          asyncComplete(builder.isdir(true)
              .mtime(modTime)
              .atime(accessTime)
              .perm(permission[0])
              .owner(owner[0])
              .group(group[0])
              .symlink(new byte[0])
              .fileId(inodeId)
              .children(childrenNums[0])
              .flags(flags[0])
              .build());
        }
      });
      asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> {
        LOG.error("Cannot get mount point: {}", e.getMessage());
        return builder.isdir(true)
            .mtime(modTime)
            .atime(accessTime)
            .perm(permission[0])
            .owner(owner[0])
            .group(group[0])
            .symlink(new byte[0])
            .fileId(inodeId)
            .children(childrenNums[0])
            .flags(flags[0])
            .build();
      }, IOException.class);
    } else {
      try {
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        owner[0] = ugi.getUserName();
        group[0] = ugi.getPrimaryGroupName();
      } catch (IOException e) {
        String msg = "Cannot get remote user: " + e.getMessage();
        if (UserGroupInformation.isSecurityEnabled()) {
          LOG.error(msg);
        } else {
          LOG.debug(msg);
        }
      } finally {
        asyncComplete(builder.isdir(true)
            .mtime(modTime)
            .atime(accessTime)
            .perm(permission[0])
            .owner(owner[0])
            .group(group[0])
            .symlink(new byte[0])
            .fileId(inodeId)
            .children(childrenNums[0])
            .flags(flags[0])
            .build());
      }
    }
    return asyncReturn(HdfsFileStatus.class);
  }

  @Override
  protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
      final RemoteMethod method, long timeOutMs) throws IOException {
    // Get the file info from everybody.
    rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
        HdfsFileStatus.class);
    asyncApply(res -> {
      Map<RemoteLocation, HdfsFileStatus> results = (Map<RemoteLocation, HdfsFileStatus>) res;
      int children = 0;
      // We return the first file.
      HdfsFileStatus dirStatus = null;
      for (RemoteLocation loc : locations) {
        HdfsFileStatus fileStatus = results.get(loc);
        if (fileStatus != null) {
          children += fileStatus.getChildrenNum();
          if (!fileStatus.isDirectory()) {
            return fileStatus;
          } else if (dirStatus == null) {
            dirStatus = fileStatus;
          }
        }
      }
      if (dirStatus != null) {
        return updateMountPointStatus(dirStatus, children);
      }
      return null;
    });
    return asyncReturn(HdfsFileStatus.class);
  }

  @Override
  public boolean recoverLease(String src, String clientName) throws IOException {
    super.recoverLease(src, clientName);
    return asyncReturn(boolean.class);
  }

  @Override
  public long[] getStats() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);

    RemoteMethod method = new RemoteMethod("getStats");
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
    asyncApply(o -> {
      Map<FederationNamespaceInfo, long[]> results
          = (Map<FederationNamespaceInfo, long[]>) o;
      long[] combinedData = new long[STATS_ARRAY_LENGTH];
      for (long[] data : results.values()) {
        for (int i = 0; i < combinedData.length && i < data.length; i++) {
          if (data[i] >= 0) {
            combinedData[i] += data[i];
          }
        }
      }
      return combinedData;
    });
    return asyncReturn(long[].class);
  }

  @Override
  public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    RemoteMethod method = new RemoteMethod("getReplicatedBlockStats");
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(nss, method, true,
        false, ReplicatedBlockStats.class);
    asyncApply(o -> {
      Map<FederationNamespaceInfo, ReplicatedBlockStats> ret =
          (Map<FederationNamespaceInfo, ReplicatedBlockStats>) o;
      return ReplicatedBlockStats.merge(ret.values());
    });
    return asyncReturn(ReplicatedBlockStats.class);
  }

  @Override
  public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
    return rpcServer.getDatanodeReportAsync(type, true, 0);
  }

  @Override
  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
    return rpcServer.getSlowDatanodeReportAsync(true, 0);
  }

  @Override
  public DatanodeStorageReport[] getDatanodeStorageReport(
      HdfsConstants.DatanodeReportType type) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);

    rpcServer.getDatanodeStorageReportMapAsync(type);
    asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>)
        dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
    return asyncReturn(DatanodeStorageReport[].class);
  }

  public DatanodeStorageReport[] getDatanodeStorageReport(
      HdfsConstants.DatanodeReportType type, boolean requireResponse, long timeOutMs)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);

    rpcServer.getDatanodeStorageReportMapAsync(type, requireResponse, timeOutMs);
    asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>)
        dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
    return asyncReturn(DatanodeStorageReport[].class);
  }

  @Override
  public boolean setSafeMode(HdfsConstants.SafeModeAction action,
                             boolean isChecked) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    // Set safe mode in all the name spaces
    RemoteMethod method = new RemoteMethod("setSafeMode",
        new Class<?>[] {HdfsConstants.SafeModeAction.class, boolean.class},
        action, isChecked);
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(
        nss, method, true, !isChecked, Boolean.class);

    asyncApply(o -> {
      Map<FederationNamespaceInfo, Boolean> results
          = (Map<FederationNamespaceInfo, Boolean>) o;
      // We only report true if all the name space are in safe mode
      int numSafemode = 0;
      for (boolean safemode : results.values()) {
        if (safemode) {
          numSafemode++;
        }
      }
      return numSafemode == results.size();
    });
    return asyncReturn(Boolean.class);
  }

  @Override
  public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);

    RemoteMethod method = new RemoteMethod("saveNamespace",
        new Class<?>[] {long.class, long.class}, timeWindow, txGap);
    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(nss, method, true,
        false, boolean.class);

    asyncApply(o -> {
      Map<FederationNamespaceInfo, Boolean> ret =
          (Map<FederationNamespaceInfo, Boolean>) o;
      boolean success = true;
      for (boolean s : ret.values()) {
        if (!s) {
          success = false;
          break;
        }
      }
      return success;
    });
    return asyncReturn(Boolean.class);
  }

  @Override
  public long rollEdits() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(nss, method, true, false, long.class);
    asyncApply(o -> {
      Map<FederationNamespaceInfo, Long> ret =
          (Map<FederationNamespaceInfo, Long>) o;
      // Return the maximum txid
      long txid = 0;
      for (long t : ret.values()) {
        if (t > txid) {
          txid = t;
        }
      }
      return txid;
    });
    return asyncReturn(long.class);
  }

  @Override
  public boolean restoreFailedStorage(String arg) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);

    RemoteMethod method = new RemoteMethod("restoreFailedStorage",
        new Class<?>[] {String.class}, arg);
    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
    asyncApply(o -> {
      Map<FederationNamespaceInfo, Boolean> ret =
          (Map<FederationNamespaceInfo, Boolean>) o;
      boolean success = true;
      for (boolean s : ret.values()) {
        if (!s) {
          success = false;
          break;
        }
      }
      return success;
    });
    return asyncReturn(boolean.class);
  }

  @Override
  public RollingUpgradeInfo rollingUpgrade(HdfsConstants.RollingUpgradeAction action)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    RemoteMethod method = new RemoteMethod("rollingUpgrade",
        new Class<?>[] {HdfsConstants.RollingUpgradeAction.class}, action);
    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();

    rpcClient.invokeConcurrent(
        nss, method, true, false, RollingUpgradeInfo.class);
    asyncApply(o -> {
      Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
          (Map<FederationNamespaceInfo, RollingUpgradeInfo>) o;
      // Return the first rolling upgrade info
      RollingUpgradeInfo info = null;
      for (RollingUpgradeInfo infoNs : ret.values()) {
        if (info == null && infoNs != null) {
          info = infoNs;
        }
      }
      return info;
    });
    return asyncReturn(RollingUpgradeInfo.class);
  }

  @Override
  public ContentSummary getContentSummary(String path) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    // Get the summaries from regular files
    final Collection<ContentSummary> summaries = new ArrayList<>();
    final List<RemoteLocation> locations = getLocationsForContentSummary(path);
    final RemoteMethod method = new RemoteMethod("getContentSummary",
        new Class<?>[] {String.class}, new RemoteParam());
    rpcClient.invokeConcurrent(locations, method,
        false, -1, ContentSummary.class);

    asyncApply(o -> {
      final List<RemoteResult<RemoteLocation, ContentSummary>> results =
          (List<RemoteResult<RemoteLocation, ContentSummary>>) o;

      FileNotFoundException notFoundException = null;
      for (RemoteResult<RemoteLocation, ContentSummary> result : results) {
        if (result.hasException()) {
          IOException ioe = result.getException();
          if (ioe instanceof FileNotFoundException) {
            notFoundException = (FileNotFoundException)ioe;
          } else if (!allowPartialList) {
            throw ioe;
          }
        } else if (result.getResult() != null) {
          summaries.add(result.getResult());
        }
      }

      // Throw original exception if no original nor mount points
      if (summaries.isEmpty() && notFoundException != null) {
        throw notFoundException;
      }
      return aggregateContentSummary(summaries);
    });

    return asyncReturn(ContentSummary.class);
  }

  @Override
  public long getCurrentEditLogTxid() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    RemoteMethod method = new RemoteMethod(
        "getCurrentEditLogTxid", new Class<?>[] {});
    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(nss, method, true, false, long.class);

    asyncApply(o -> {
      Map<FederationNamespaceInfo, Long> ret =
          (Map<FederationNamespaceInfo, Long>) o;
      // Return the maximum txid
      long txid = 0;
      for (long t : ret.values()) {
        if (t > txid) {
          txid = t;
        }
      }
      return txid;
    });
    return asyncReturn(long.class);
  }

  @Override
  public void msync() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
    // Only msync to nameservices with observer reads enabled.
    Set<FederationNamespaceInfo> allNamespaces = namenodeResolver.getNamespaces();
    RemoteMethod method = new RemoteMethod("msync");
    Set<FederationNamespaceInfo> namespacesEligibleForObserverReads = allNamespaces
        .stream()
        .filter(ns -> rpcClient.isNamespaceObserverReadEligible(ns.getNameserviceId()))
        .collect(Collectors.toSet());
    if (namespacesEligibleForObserverReads.isEmpty()) {
      asyncCompleteWith(CompletableFuture.completedFuture(null));
      return;
    }
    rpcClient.invokeConcurrent(namespacesEligibleForObserverReads, method);
  }

  @Override
  public boolean setReplication(String src, short replication)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
    RemoteMethod method = new RemoteMethod("setReplication",
        new Class<?>[] {String.class, short.class}, new RemoteParam(),
        replication);
    if (rpcServer.isInvokeConcurrent(src)) {
      rpcClient.invokeConcurrent(locations, method, Boolean.class);
      asyncApply(o -> {
        Map<RemoteLocation, Boolean> results = (Map<RemoteLocation, Boolean>) o;
        return !results.containsValue(false);
      });
    } else {
      rpcClient.invokeSequential(locations, method, Boolean.class,
          Boolean.TRUE);
    }
    return asyncReturn(boolean.class);
  }

  /**
   * Checks if the path is a directory and is supposed to be present in all
   * subclusters.
   * @param src the source path
   * @return true if the path is directory and is supposed to be present in all
   *         subclusters else false in all other scenarios.
   */
  @VisibleForTesting
  @Override
  public boolean isMultiDestDirectory(String src) {
    asyncTry(() -> {
      if (rpcServer.isPathAll(src)) {
        List<RemoteLocation> locations;
        locations = rpcServer.getLocationsForPath(src, false, false);
        RemoteMethod method = new RemoteMethod("getFileInfo",
            new Class<?>[] {String.class}, new RemoteParam());
        rpcClient.invokeSequential(locations,
            method, HdfsFileStatus.class, null);
        asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> {
          if (fileStatus != null) {
            return fileStatus.isDirectory();
          } else {
            LOG.debug("The destination {} doesn't exist.", src);
            return false;
          }
        });
      } else {
        asyncComplete(false);
      }
    });
    asyncCatch((CatchFunction<Object, UnresolvedPathException>) (o, e) -> {
      LOG.debug("The destination {} is a symlink.", src);
      return false;
    }, UnresolvedPathException.class);

    return asyncReturn(boolean.class);
  }

  @Override
  public Path getEnclosingRoot(String src) throws IOException {
    final Path[] mountPath = new Path[1];
    if (defaultNameServiceEnabled) {
      mountPath[0] = new Path("/");
    }

    if (subclusterResolver instanceof MountTableResolver) {
      MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
      if (mountTable.getMountPoint(src) != null) {
        mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath());
      }
    }

    if (mountPath[0] == null) {
      throw new IOException(String.format("No mount point for %s", src));
    }

    getEZForPath(src);
    asyncApply((ApplyFunction<EncryptionZone, Path>)zone -> {
      if (zone == null) {
        return mountPath[0];
      } else {
        Path zonePath = new Path(zone.getPath());
        return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0];
      }
    });
    return asyncReturn(Path.class);
  }

  @Override
  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
    asyncComplete(getSecurityManager().getDelegationToken(renewer));
    return asyncReturn(Token.class);
  }

  @Override
  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
    asyncComplete(getSecurityManager().renewDelegationToken(token));
    return asyncReturn(Long.class);
  }

  @Override
  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
    getSecurityManager().cancelDelegationToken(token);
    asyncComplete(null);
  }

}