AsyncUtil.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router.async.utils;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;

import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.CUR_COMPLETABLE_FUTURE;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;

/**
 * The AsyncUtil class provides a collection of utility methods to simplify
 * the implementation of asynchronous operations using Java's CompletableFuture.
 * It encapsulates common patterns such as applying functions, handling exceptions,
 * and executing tasks in a non-blocking manner. This class is designed to work
 * with Hadoop's asynchronous router operations in HDFS Federation.
 *
 * <p>The utility methods support a fluent-style API, allowing for the chaining of
 * asynchronous operations. For example, after an asynchronous operation completes,
 * a function can be applied to its result, and the process can continue with
 * the new result. This is particularly useful for complex workflows that require
 * multiple steps, where each step depends on the completion of the previous one.</p>
 *
 * <p>The class also provides methods to handle exceptions that may occur during a
 * synchronous operation. This ensures that error handling is integrated smoothly
 * into the workflow, allowing for robust and fault-tolerant applications.</p>
 *
 * @see CompletableFuture
 * @see ApplyFunction
 * @see AsyncApplyFunction
 * @see AsyncRun
 * @see AsyncForEachRun
 * @see CatchFunction
 * @see AsyncCatchFunction
 * @see FinallyFunction
 * @see AsyncBiFunction
 */
public final class AsyncUtil {
  private static final Boolean BOOLEAN_RESULT = false;
  private static final Long LONG_RESULT = -1L;
  private static final Integer INT_RESULT = -1;
  private static final Object NULL_RESULT = null;

  private AsyncUtil(){}

  /**
   * Provides a default value based on the type specified.
   *
   * @param clazz The {@link Class} object representing the type of the value
   *              to be returned.
   * @param <R>   The type of the value to be returned.
   * @return An object with a value determined by the type:
   *         <ul>
   *           <li>{@code false} if {@code clazz} is {@link Boolean},
   *           <li>-1 if {@code clazz} is {@link Long},
   *           <li>-1 if {@code clazz} is {@link Integer},
   *           <li>{@code null} for any other type.
   *         </ul>
   */
  public static <R> R asyncReturn(Class<R> clazz) {
    if (clazz == null) {
      return null;
    }
    if (clazz.equals(Boolean.class)
        || clazz.equals(boolean.class)) {
      return (R) BOOLEAN_RESULT;
    } else if (clazz.equals(Long.class)
        || clazz.equals(long.class)) {
      return (R) LONG_RESULT;
    } else if (clazz.equals(Integer.class)
        || clazz.equals(int.class)) {
      return (R) INT_RESULT;
    }
    return (R) NULL_RESULT;
  }

  /**
   * Synchronously returns the result of the current asynchronous operation.
   * This method is designed to be used in scenarios where the result of an
   * asynchronous operation is needed synchronously, and it is known that
   * the operation has completed.
   *
   * <p>The method retrieves the current thread's {@link CompletableFuture} and
   * attempts to get the result. If the future is not yet complete, this
   * method will block until the result is available. If the future completed
   * exceptionally, the cause of the exception is thrown as a runtime
   * exception wrapped in an {@link ExecutionException}.</p>
   *
   * <p>This method is typically used after an asynchronous operation has been
   * initiated and the caller needs to obtain the result in a synchronous
   * manner, for example, when bridging between asynchronous and synchronous
   * code paths.</p>
   *
   * @param <R> the type of the result to be returned
   * @param clazz the {@link Class} object representing the type of the value
   *              to be returned, used to cast the result to the correct type
   * @return the result of the asynchronous operation as an object of the
   *         specified class
   * @throws Exception if an error occurs during the synchronous retrieval of
   *                    the result, including the original exception if the
   *                    future completed exceptionally
   */
  public static <R> R syncReturn(Class<R> clazz)
      throws Exception {
    CompletableFuture<Object> completableFuture = CUR_COMPLETABLE_FUTURE.get();
    assert completableFuture != null;
    try {
      return (R) completableFuture.get();
    } catch (ExecutionException e) {
      throw (Exception) e.getCause();
    }
  }

  /**
   * Completes the current asynchronous operation with the specified value.
   * This method sets the result of the current thread's {@link CompletableFuture}
   * to the provided value, effectively completing the asynchronous operation.
   *
   * @param value The value to complete the future with.
   * @param <R>    The type of the value to be completed.
   */
  public static <R> void asyncComplete(R value) {
    CUR_COMPLETABLE_FUTURE.set(
        CompletableFuture.completedFuture(value));
  }

  /**
   * Completes the current asynchronous operation with the specified completableFuture.
   *
   * @param completableFuture The completableFuture to complete the future with.
   * @param <R>    The type of the value to be completed.
   */
  public static <R> void asyncCompleteWith(CompletableFuture<R> completableFuture) {
    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
  }

  public static CompletableFuture<Object> getAsyncUtilCompletableFuture() {
    assert CUR_COMPLETABLE_FUTURE.get() != null;
    return CUR_COMPLETABLE_FUTURE.get();
  }

  /**
   * Completes the current asynchronous operation with an exception.
   * This method sets the result of the current thread's {@link CompletableFuture}
   * to an exceptional completion, using the provided {@link Throwable} as the cause.
   * This is typically used to handle errors in asynchronous operations.
   *
   * @param e The exception to complete the future exceptionally with.
   */
  public static void asyncThrowException(Throwable e) {
    CompletableFuture<Object> result = new CompletableFuture<>();
    result.completeExceptionally(warpCompletionException(e));
    CUR_COMPLETABLE_FUTURE.set(result);
  }

  /**
   * Applies an asynchronous function to the current {@link CompletableFuture}.
   * This method retrieves the current thread's {@link CompletableFuture} and applies
   * the provided {@link ApplyFunction} to it. It is used to chain asynchronous
   * operations, where the result of one operation is used as the input for the next.
   *
   * @param function The asynchronous function to apply, which takes a type T and
   *                 produces a type R.
   * @param <T>       The type of the input to the function.
   * @param <R>       The type of the result of the function.
   * @see CompletableFuture
   * @see ApplyFunction
   */
  public static <T, R> void asyncApply(ApplyFunction<T, R> function) {
    CompletableFuture<T> completableFuture =
        (CompletableFuture<T>) CUR_COMPLETABLE_FUTURE.get();
    assert completableFuture != null;
    CompletableFuture<R> result = function.apply(completableFuture);
    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
  }

  /**
   * Applies an asynchronous function to the current {@link CompletableFuture}
   * using the specified executor. This method retrieves the current thread's
   * {@link CompletableFuture} and applies the provided{@link ApplyFunction} to
   * it with the given executor service. It allows for more control over the
   * execution context, such as running the operation in a separate thread or
   * thread pool.
   *
   * <p>This is particularly useful when you need to perform blocking I/O operations
   * or other long-running tasks without blocking the main thread or
   * when you want to manage the thread resources more efficiently.</p>
   *
   * @param function The asynchronous function to apply, which takes a type T and
   *                 produces a type R.
   * @param executor The executor service used to run the asynchronous function.
   * @param <T> The type of the input to the function.
   * @param <R> The type of the result of the function.
   * @see CompletableFuture
   * @see ApplyFunction
   */
  public static <T, R> void asyncApplyUseExecutor(
      ApplyFunction<T, R> function, Executor executor) {
    CompletableFuture<T> completableFuture =
        (CompletableFuture<T>) CUR_COMPLETABLE_FUTURE.get();
    assert completableFuture != null;
    CompletableFuture<R> result = function.apply(completableFuture, executor);
    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
  }

  /**
   * Attempts to execute an asynchronous task defined by the provided
   * {@link AsyncRun} and associates it with the current thread's
   * {@link CompletableFuture}. This method is useful for trying operations
   * that may throw exceptions and handling them asynchronously.
   *
   * <p>The provided {@code asyncRun} is a functional interface that
   * encapsulates the logic to be executed asynchronously. It is executed in
   * the context of the current CompletableFuture, allowing for chaining further
   * asynchronous operations based on the result or exception of this try.</p>
   *
   * <p>If the operation completes successfully, the result is propagated to the
   * next operation in the chain. If an exception occurs, it can be caught and
   * handled using the {@link #asyncCatch(CatchFunction, Class)} method,
   * allowing for error recovery or alternative processing.</p>
   *
   * @param asyncRun The asynchronous task to be executed, defined by
   *                  an {@link AsyncRun} instance.
   * @param <R>       The type of the result produced by the asynchronous task.
   * @see AsyncRun
   * @see #asyncCatch(CatchFunction, Class)
   */
  public static <R> void asyncTry(AsyncRun<R> asyncRun) {
    try {
      CompletableFuture<R> result = asyncRun.async();
      CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
    } catch (Throwable e) {
      asyncThrowException(e);
    }
  }

  /**
   * Handles exceptions to a specified type that may occur during
   * an asynchronous operation. This method is used to catch and deal
   * with exceptions in a non-blocking manner, allowing the application
   * to continue processing even when errors occur.
   *
   * <p>The provided {@code function} is a {@link CatchFunction} that
   * defines how to handle the caught exception. It takes the result of
   * the asynchronous operation (if any) and the caught exception, and
   * returns a new result or modified result to continue the asynchronous
   * processing.</p>
   *
   * <p>The {@code eClass} parameter specifies the type of exceptions to
   * catch. Only exceptions that are instances of this type (or its
   * subclasses) will be caught and handled by the provided function.</p>
   *
   * @param function The {@link CatchFunction} that defines how to
   *                  handle the caught exception.
   * @param eClass   The class of the exception type to catch.
   * @param <R>       The type of the result of the asynchronous operation.
   * @param <E>       The type of the exception to catch.
   * @see CatchFunction
   */
  public static <R, E extends Throwable> void asyncCatch(
      CatchFunction<R, E> function, Class<E> eClass) {
    CompletableFuture<R> completableFuture =
        (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
    assert completableFuture != null;
    CompletableFuture<R> result = function.apply(completableFuture, eClass);
    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
  }

  /**
   * Executes a final action after an asynchronous operation
   * completes, regardless of whether the operation was successful
   * or resulted in an exception. This method provides a way to
   * perform cleanup or finalization tasks in an asynchronous
   * workflow.
   *
   * <p>The provided {@code function} is a {@link FinallyFunction}
   * that encapsulates the logic to be executed after the
   * asynchronous operation. It takes the result of the operation
   * and returns a new result, which can be used to continue the
   * asynchronous processing or to handle the final output of
   * the workflow.</p>
   *
   * <p>This method is particularly useful for releasing resources,
   * closing connections, or performing other cleanup actions that
   * need to occur after all other operations have completed.</p>
   *
   * @param function The {@link FinallyFunction} that defines
   *                  the final action to be executed.
   * @param <R>       The type of the result of the asynchronous
   *                   operation.
   * @see FinallyFunction
   */
  public static <R> void asyncFinally(FinallyFunction<R> function) {
    CompletableFuture<R> completableFuture =
        (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
    assert completableFuture != null;
    CompletableFuture<R> result = function.apply(completableFuture);
    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
  }

  /**
   * Executes an asynchronous operation for each element in an Iterator, applying
   * a given async function to each element. This method is part of the asynchronous
   * utilities provided to facilitate non-blocking operations on collections of elements.
   *
   * <p>The provided {@code asyncDo} is an {@link AsyncBiFunction} that encapsulates
   * the logic to be executed asynchronously for each element. It is executed in
   * the context of the current CompletableFuture, allowing for chaining further
   * asynchronous operations based on the result or exception of each iteration.</p>
   *
   * <p>The method is particularly useful for performing asynchronous iterations
   * over collections where the processing of each element is independent.</p>
   *
   * @param forEach the Iterator over which to iterate and apply the async function
   * @param asyncDo the asynchronous function to apply to each element of the Iterator,
   *                 implemented as an {@link AsyncBiFunction}
   * @param <I> the type of the elements being iterated over
   * @param <R> the type of the result produced by the asynchronous task applied to each element
   * @see AsyncBiFunction
   * @see AsyncForEachRun
   */
  public static <I, R> void asyncForEach(
      Iterator<I> forEach, AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDo) {
    AsyncForEachRun<I, R> asyncForEachRun = new AsyncForEachRun<>();
    asyncForEachRun.forEach(forEach).asyncDo(asyncDo).run();
  }

  /**
   * Applies an asynchronous operation to each element of a collection
   * and aggregates the results. This method is designed to process a
   * collection of elements concurrently using asynchronous tasks, and
   * then combine the results into a single aggregated result.
   *
   * <p>The operation defined by {@code asyncDo} is applied to each
   * element of the collection. This operation is expected to return a
   * {@link CompletableFuture} representing the asynchronous task.
   * Once all tasks have been started, the method (async) waits for all of
   * them to complete and then uses the {@code then} function to
   * process and aggregate the results.</p>
   *
   * <p>The {@code then} function takes an array of {@link CompletableFuture}
   * instances, each representing the future result of an individual
   * asynchronous operation. It should return a new aggregated result
   * based on these futures. This allows for various forms of result
   * aggregation, such as collecting all results into a list,
   * reducing them to a single value, or performing any other custom
   * aggregation logic.</p>
   *
   * @param collection the collection of elements to process.
   * @param asyncDo    the asynchronous operation to apply to each
   *                   element. It must return a {@link CompletableFuture}
   *                   representing the operation.
   * @param then        a function that takes an array of futures
   *                   representing the results of the asynchronous
   *                   operations and returns an aggregated result.
   * @param <I>        the type of the elements in the collection.
   * @param <R>        the type of the intermediate result from the
   *                   asynchronous operations.
   * @param <P>        the type of the final aggregated result.
   * @see CompletableFuture
   */
  public static <I, R, P> void asyncCurrent(
      Collection<I> collection, AsyncApplyFunction<I, R> asyncDo,
      Function<CompletableFuture<R>[], P> then) {
    CompletableFuture<R>[] completableFutures =
        new CompletableFuture[collection.size()];
    int i = 0;
    for(I entry : collection) {
      CompletableFuture<R> future = null;
      try {
        future = asyncDo.async(entry);
      } catch (IOException e) {
        future = new CompletableFuture<>();
        future.completeExceptionally(warpCompletionException(e));
      }
      completableFutures[i++] = future;
    }
    CompletableFuture<P> result = CompletableFuture.allOf(completableFutures)
        .handle((unused, throwable) -> then.apply(completableFutures));
    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
  }

  /**
   * Get the CompletableFuture object stored in the current thread's local variable.
   *
   * @return The completableFuture object.
   */
  public static CompletableFuture<Object> getCompletableFuture() {
    return CUR_COMPLETABLE_FUTURE.get();
  }
}