AsyncForEachRun.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router.async.utils;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;

/**
 * The AsyncForEachRun class is part of the asynchronous operation utilities
 * within the Hadoop Distributed File System (HDFS) Federation router.
 * It provides the functionality to perform asynchronous operations on each
 * element of an Iterator, applying a given async function.
 *
 * <p>This class is designed to work with other asynchronous interfaces and
 * utility classes to enable complex asynchronous workflows. It allows for
 * non-blocking execution of tasks, which can improve the performance and
 * responsiveness of HDFS operations.</p>
 *
 * <p>The class implements the AsyncRun interface, which means it can be used
 * in asynchronous task chains. It maintains an Iterator of elements to
 * process, an asyncDoOnce to apply to each element.</p>
 *
 * <p>The run method initiates the asynchronous operation, and the doOnce
 * method recursively applies the asyncDoOnce to each element and handles
 * the results. If the shouldBreak flag is set, the operation is completed
 * with the current result.</p>
 *
 * <p>AsyncForEachRun is used to implement the following semantics:</p>
 * <pre>
 * {@code
 * for (I element : elements) {
 *     R result = asyncDoOnce(element);
 * }
 * return result;
 * }
 * </pre>
 *
 * @param <I> the type of the elements being iterated over
 * @param <R> the type of the final result after applying the thenApply function
 * @see AsyncRun
 * @see AsyncBiFunction
 */
public class AsyncForEachRun<I, R> implements AsyncRun<R> {

  // Indicates whether the iteration should be broken immediately
  // after the next asynchronous operation is completed.
  private boolean shouldBreak = false;
  // The Iterator over the elements to process asynchronously.
  private Iterator<I> iterator;
  // The async function to apply to each element from the iterator.
  private AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDoOnce;

  /**
   * Initiates the asynchronous foreach operation by starting the iteration process
   * over the elements provided by the iterator. This method sets up the initial
   * call to doOnce(R) with a null result, which begins the recursive
   * application of the async function to each element of the iterator.
   *
   * <p>This method is an implementation of the {@link AsyncRun} interface's
   * {@code run} method, allowing it to be used in a chain of asynchronous
   * operations. It is responsible for starting the asynchronous processing and
   * handling the completion of the operation through the internal
   * {@link CompletableFuture}.</p>
   *
   * <p>If an exception occurs during the first call to {@code doOnce}, the
   * exception is caught and the internal CompletableFuture is completed
   * exceptionally with a {@link CompletionException} wrapping the original
   * IOException.</p>
   *
   * <p>After initiating the operation, the method sets the current thread's
   * {@link Async} {@link CompletableFuture} by calling
   * {@link #setCurCompletableFuture(CompletableFuture)} with the internal result
   * CompletableFuture. This allows other parts of the asynchronous workflow to
   * chain further operations or handle the final result once the foreach loop
   * completes.</p>
   *
   * @see AsyncRun
   * @see Async#setCurCompletableFuture(CompletableFuture)
   */
  @Override
  public void run() {
    if (iterator == null || !iterator.hasNext()) {
      setCurCompletableFuture(CompletableFuture.completedFuture(null));
      return;
    }
    CompletableFuture<R> result;
    try {
      result = doOnce(iterator.next());
    } catch (IOException ioe) {
      result = new CompletableFuture<>();
      result.completeExceptionally(warpCompletionException(ioe));
    }
    setCurCompletableFuture(result);
  }

  /**
   * Recursively applies the async function to the next element of the iterator
   * and handles the result. This method is called for each iteration of the
   * asynchronous foreach loop, applying the async function to each element
   * and chaining the results.
   *
   * <p>If the iterator has no more elements, the CompletableFuture held by this
   * class is completed with the last result. If an exception occurs during
   * the application of the async function, it is propagated to the
   * CompletableFuture, which completes exceptionally.</p>
   *
   * <p>This method is designed to be called by the {@link #run()} method and
   * handles the iteration logic, including breaking the loop if the
   * {@link #shouldBreak} flag is set to true.</p>
   *
   * @param element The current element from the async function application.
   * @throws IOException if an I/O error occurs during the application of the async function.
   */
  private CompletableFuture<R> doOnce(I element) throws IOException {
    CompletableFuture<R> completableFuture = asyncDoOnce.async(AsyncForEachRun.this, element);
    return completableFuture.thenCompose(res -> {
      if (shouldBreak || !iterator.hasNext()) {
        return completableFuture;
      }
      try {
        return doOnce(iterator.next());
      } catch (IOException e) {
        throw warpCompletionException(e);
      }
    });
  }

  /**
   * Triggers the termination of the current asynchronous iteration.
   *
   * <p>This method is used to break out of the asynchronous for-each loop
   * prematurely. It sets a flag that indicates the iteration should be
   * terminated at the earliest opportunity. This is particularly useful when
   * the processing logic determines that further iteration is unnecessary
   * or when a specific condition has been met.</p>
   *
   * <p>Once this method is called, the next time the loop is about to process
   * a new element, it will check the flag and cease operation, allowing the
   * application to move on to the next step or complete the task.</p>
   */
  public void breakNow() {
    shouldBreak = true;
  }

  /**
   * Sets the Iterator for the elements to be processed in the asynchronous operation.
   *
   * @param forEach The Iterator over the elements.
   * @return The current AsyncForEachRun instance for chaining.
   */
  public AsyncForEachRun<I, R> forEach(Iterator<I> forEach) {
    this.iterator = forEach;
    return this;
  }

  /**
   * Sets the async function to apply to each element from the iterator.
   *
   * @param asyncDo The async function.
   * @return The current AsyncForEachRun instance for chaining.
   */
  public AsyncForEachRun<I, R> asyncDo(AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDo) {
    this.asyncDoOnce = asyncDo;
    return this;
  }
}