AsyncCatchFunction.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router.async.utils;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.unWarpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;

/**
 * The AsyncCatchFunction interface represents a function that handles exceptions
 * occurring within an asynchronous operation. It extends the CatchFunction
 * interface and adds the capability to perform asynchronous exception handling.
 *
 * <p>This interface is part of the asynchronous utilities provided by the Hadoop
 * Distributed File System (HDFS) Federation router. It is used in conjunction
 * with other asynchronous interfaces such as AsyncRun to build complex,
 * non-blocking operations.</p>
 *
 * <p>An implementation of this interface should define how to handle a caught
 * exception asynchronously. It takes two parameters: the result of the
 * asynchronous operation (if any) and the caught exception. The function can then
 * initiate an asynchronous process to handle the exception, which may involve
 * logging the error, performing a recovery operation, or any other custom logic.</p>
 *
 * <p>For example, the applyAsync method is intended to be called when an exception
 * is caught during an asynchronous operation. It should initiate the asynchronous
 * process to handle the exception without blocking the main execution thread.</p>
 *
 * <p>The default method apply is provided to allow synchronous operation in case
 * the asynchronous handling is not required. It simply calls applyAsync and waits
 * for the result.</p>
 *
 * <p>The default method async is provided to allow chaining with other asynchronous
 * operations. It calls applyAsync and returns a CompletableFuture that completes
 * with the result of the operation.</p>
 *
 * <p>AsyncCatchFunction is used to implement the following semantics:</p>
 * <pre>
 * {@code
 *    try{
 *      R res = doAsync1(input);
 *    } catch(E e) {
 *      // Can use AsyncCatchFunction
 *      R result = doAsync2(res, e);
 *    }
 * }
 * </pre>
 *
 * @param <R> the type of the result of the asynchronous operation
 * @param <E> the type of the exception to catch, extending Throwable
 * @see CatchFunction
 * @see AsyncRun
 */
@FunctionalInterface
public interface AsyncCatchFunction<R, E extends Throwable>
    extends CatchFunction<R, E> {

  /**
   * Asynchronously applies this function to the given exception.
   *
   * <p>This method is intended to be called when an exception is caught
   * during an asynchronous operation. It should initiate the asynchronous process
   * to handle the exception without blocking the main execution thread.
   * The actual handling of the exception, such as logging the error, performing
   * a recovery operation, or any other custom logic, should be implemented in this
   * method.</p>
   *
   * @param r the result of the asynchronous operation, if any; may be null
   * @param e the caught exception
   * @throws IOException if an I/O error occurs during the application of the function
   */
  void applyAsync(R r, E e) throws IOException;

  /**
   * Synchronously applies this function to the given result and exception.
   * <p>
   * This method first calls {@code applyAsync} to initiate the asynchronous handling
   * of the exception. Then, it waits for the asynchronous operation to complete
   * by calling {@code result}, which retrieves the result of the current
   * thread's {@link CompletableFuture}.
   * <p>
   *
   * @param r the result of the asynchronous operation, if any; may be null
   * @param e the caught exception
   * @return the result after applying the function
   * @throws IOException if an I/O error occurs during the application of the function
   */
  @Override
  default R apply(R r, E e) throws IOException {
    applyAsync(r, e);
    return result();
  }

  /**
   * Initiates the asynchronous application of this function to the given result and exception.
   * <p>
   * This method calls applyAsync to start the asynchronous operation and then retrieves
   * the current thread's CompletableFuture using getCurCompletableFuture.
   * It returns this CompletableFuture, which will be completed with the result of the
   * asynchronous operation once it is finished.
   * <p>
   * This method is useful for chaining with other asynchronous operations, as it allows the
   * current operation to be part of a larger asynchronous workflow.
   *
   * @param r the result of the asynchronous operation, if any; may be null
   * @param e the caught exception
   * @return a CompletableFuture that will be completed with the result of the
   *         asynchronous operation
   * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation
   */
  default CompletableFuture<R> async(R r, E e) throws IOException {
    applyAsync(r, e);
    CompletableFuture<R> completableFuture = getCurCompletableFuture();
    assert completableFuture != null;
    return completableFuture;
  }

  /**
   * Applies the catch function to a {@link CompletableFuture}, handling exceptions of a
   * specified type.
   * <p>
   * This method is a default implementation that provides a way to integrate exception
   * handling into a chain of asynchronous operations. It takes a {@code CompletableFuture}
   * and a class object representing the exception type to catch. The method then completes
   * the future with the result of applying this catch function to the input future and the
   * specified exception type.
   * <p>
   * If the input future completes exceptionally with an instance of the specified exception
   * type, the catch function is applied to the exception. Otherwise, if the future
   * completes with a different type of exception or normally, the original result or
   * exception is propagated.
   *
   * @param in the input {@code CompletableFuture} to which the catch function is applied
   * @param eClazz the class object representing the exception type to catch
   * @return a new {@code CompletableFuture} that completes with the result of applying
   *         the catch function, or propagates the original exception if it does not match
   *         the specified type
   */
  @Override
  default CompletableFuture<R> apply(
      CompletableFuture<R> in, Class<E> eClazz) {
    return in.handle((r, e) -> {
      if (e == null) {
        return in;
      }
      Throwable readException = unWarpCompletionException(e);
      if (eClazz.isInstance(readException)) {
        try {
          return async(r, (E) readException);
        } catch (IOException ex) {
          throw warpCompletionException(ex);
        }
      }
      throw warpCompletionException(e);
    }).thenCompose(result -> result);
  }
}