AsyncErasureCoding.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router.async;

import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.ErasureCoding;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;

/**
 * Provides asynchronous operations for erasure coding in HDFS Federation.
 * This class extends {@link org.apache.hadoop.hdfs.server.federation.router.ErasureCoding}
 * and overrides its methods to perform erasure coding operations in a non-blocking manner,
 * allowing for concurrent execution and improved performance.
 */
public class AsyncErasureCoding extends ErasureCoding {
  /** RPC server to receive client calls. */
  private final RouterRpcServer rpcServer;
  /** RPC clients to connect to the Namenodes. */
  private final RouterRpcClient rpcClient;
  /** Interface to identify the active NN for a nameservice or blockpool ID. */
  private final ActiveNamenodeResolver namenodeResolver;

  public AsyncErasureCoding(RouterRpcServer server) {
    super(server);
    this.rpcServer = server;
    this.rpcClient =  this.rpcServer.getRPCClient();
    this.namenodeResolver = this.rpcClient.getNamenodeResolver();
  }

  /**
   * Asynchronously get an array of all erasure coding policies.
   * This method checks the operation category and then invokes the
   * getErasureCodingPolicies method concurrently across all namespaces.
   * <p>
   * The results are merged and returned as an array of ErasureCodingPolicyInfo.
   *
   * @return Array of ErasureCodingPolicyInfo.
   * @throws IOException If an I/O error occurs.
   */
  @Override
  public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    RemoteMethod method = new RemoteMethod("getErasureCodingPolicies");
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();

    rpcClient.invokeConcurrent(
        nss, method, true, false, ErasureCodingPolicyInfo[].class);
    asyncApply(
        (ApplyFunction<Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]>,
            ErasureCodingPolicyInfo[]>) ret -> merge(ret, ErasureCodingPolicyInfo.class));

    return asyncReturn(ErasureCodingPolicyInfo[].class);
  }

  /**
   * Asynchronously get the erasure coding codecs available.
   * This method checks the operation category and then invokes the
   * getErasureCodingCodecs method concurrently across all namespaces.
   * <p>
   * The results are merged into a single map of codec names to codec properties.
   *
   * @return Map of erasure coding codecs.
   * @throws IOException If an I/O error occurs.
   */
  @Override
  public Map<String, String> getErasureCodingCodecs() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    RemoteMethod method = new RemoteMethod("getErasureCodingCodecs");
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();

    rpcClient.invokeConcurrent(
        nss, method, true, false, Map.class);

    asyncApply((ApplyFunction<Map<FederationNamespaceInfo,
        Map<String, String>>, Map<String, String>>) retCodecs -> {
        Map<String, String> ret = new HashMap<>();
        Object obj = retCodecs;
        @SuppressWarnings("unchecked")
        Map<FederationNamespaceInfo, Map<String, String>> results =
            (Map<FederationNamespaceInfo, Map<String, String>>)obj;
        Collection<Map<String, String>> allCodecs = results.values();
        for (Map<String, String> codecs : allCodecs) {
          ret.putAll(codecs);
        }
        return ret;
      });

    return asyncReturn(Map.class);
  }

  /**
   * Asynchronously add an array of erasure coding policies.
   * This method checks the operation category and then invokes the
   * addErasureCodingPolicies method concurrently across all namespaces.
   * <p>
   * The results are merged and returned as an array of AddErasureCodingPolicyResponse.
   *
   * @param policies Array of erasure coding policies to add.
   * @return Array of AddErasureCodingPolicyResponse.
   * @throws IOException If an I/O error occurs.
   */
  @Override
  public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
      ErasureCodingPolicy[] policies) throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    RemoteMethod method = new RemoteMethod("addErasureCodingPolicies",
        new Class<?>[] {ErasureCodingPolicy[].class}, new Object[] {policies});
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();

    rpcClient.invokeConcurrent(
        nss, method, true, false, AddErasureCodingPolicyResponse[].class);

    asyncApply(
        (ApplyFunction<Map<FederationNamespaceInfo, AddErasureCodingPolicyResponse[]>,
            AddErasureCodingPolicyResponse[]>) ret -> {
          return merge(ret, AddErasureCodingPolicyResponse.class);
        });
    return asyncReturn(AddErasureCodingPolicyResponse[].class);
  }

  /**
   * Asynchronously get the erasure coding policy for a given source path.
   * This method checks the operation category and then invokes the
   * getErasureCodingPolicy method sequentially for the given path.
   * <p>
   * The result is returned as an ErasureCodingPolicy object.
   *
   * @param src Source path to get the erasure coding policy for.
   * @return ErasureCodingPolicy for the given path.
   * @throws IOException If an I/O error occurs.
   */
  @Override
  public ErasureCodingPolicy getErasureCodingPolicy(String src)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    final List<RemoteLocation> locations =
        rpcServer.getLocationsForPath(src, false, false);
    RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy",
        new Class<?>[] {String.class}, new RemoteParam());
    rpcClient.invokeSequential(
        locations, remoteMethod, null, null);

    asyncApply(ret -> {
      return (ErasureCodingPolicy) ret;
    });

    return asyncReturn(ErasureCodingPolicy.class);
  }

  /**
   * Asynchronously get the EC topology result for the given policies.
   * This method checks the operation category and then invokes the
   * getECTopologyResultForPolicies method concurrently across all namespaces.
   * <p>
   * The results are merged and the first unsupported result is returned.
   *
   * @param policyNames Array of policy names to check.
   * @return ECTopologyVerifierResult for the policies.
   * @throws IOException If an I/O error occurs.
   */
  @Override
  public ECTopologyVerifierResult getECTopologyResultForPolicies(
      String[] policyNames) throws IOException {
    RemoteMethod method = new RemoteMethod("getECTopologyResultForPolicies",
        new Class<?>[] {String[].class}, new Object[] {policyNames});
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    if (nss.isEmpty()) {
      throw new IOException("No namespace availaible.");
    }

    rpcClient.invokeConcurrent(nss, method, true, false,
        ECTopologyVerifierResult.class);
    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, ECTopologyVerifierResult>,
        ECTopologyVerifierResult>) ret -> {
        for (Map.Entry<FederationNamespaceInfo, ECTopologyVerifierResult> entry :
            ret.entrySet()) {
            if (!entry.getValue().isSupported()) {
              return entry.getValue();
            }
          }
        // If no negative result, return the result from the first namespace.
        return ret.get(nss.iterator().next());
      });
    return asyncReturn(ECTopologyVerifierResult.class);
  }

  /**
   * Asynchronously get the erasure coding block group statistics.
   * This method checks the operation category and then invokes the
   * getECBlockGroupStats method concurrently across all namespaces.
   * <p>
   * The results are merged and returned as an ECBlockGroupStats object.
   *
   * @return ECBlockGroupStats for the erasure coding block groups.
   * @throws IOException If an I/O error occurs.
   */
  @Override
  public ECBlockGroupStats getECBlockGroupStats() throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.READ);

    RemoteMethod method = new RemoteMethod("getECBlockGroupStats");
    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
    rpcClient.invokeConcurrent(
        nss, method, true, false, ECBlockGroupStats.class);

    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, ECBlockGroupStats>,
        ECBlockGroupStats>) allStats -> {
        return ECBlockGroupStats.merge(allStats.values());
      });
    return asyncReturn(ECBlockGroupStats.class);
  }
}