Async.java

package graphql.execution;

import graphql.Assert;
import graphql.Internal;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import static graphql.Assert.assertTrue;

@Internal
@SuppressWarnings("FutureReturnValueIgnored")
public class Async {

    /**
     * A builder of materialized objects or {@link CompletableFuture}s than can present a promise to the list of them
     * <p>
     * This builder has a strict contract on size whereby if the expectedSize is five, then there MUST be five elements presented to it.
     *
     * @param <T> for two
     */
    public interface CombinedBuilder<T> {

        /**
         * This adds a {@link CompletableFuture} into the collection of results
         *
         * @param completableFuture the CF to add
         */
        void add(CompletableFuture<T> completableFuture);

        /**
         * This adds a new value which can be either a materialized value or a {@link CompletableFuture}
         *
         * @param object the object to add
         */
        void addObject(Object object);

        /**
         * This will return a {@code CompletableFuture<List<T>>} even if the inputs are all materialized values
         *
         * @return a CompletableFuture to a List of values
         */
        CompletableFuture<List<T>> await();

        /**
         * This will return a {@code CompletableFuture<List<T>>} if ANY of the input values are async
         * otherwise it just return a materialised {@code List<T>}
         *
         * @return either a CompletableFuture or a materialized list
         */
        /* CompletableFuture<List<T>> | List<T> */ Object awaitPolymorphic();
    }

    /**
     * Combines zero or more CFs into one. It is a wrapper around <code>CompletableFuture.allOf</code>.
     *
     * @param expectedSize how many we expect
     * @param <T>          for two
     *
     * @return a combined builder of CFs
     */
    public static <T> CombinedBuilder<T> ofExpectedSize(int expectedSize) {
        if (expectedSize == 0) {
            return new Empty<>();
        } else if (expectedSize == 1) {
            return new Single<>();
        } else {
            return new Many<>(expectedSize);
        }
    }

    private static class Empty<T> implements CombinedBuilder<T> {

        private int ix;

        @Override
        public void add(CompletableFuture<T> completableFuture) {
            this.ix++;
        }

        @Override
        public void addObject(Object object) {
            this.ix++;
        }

        @Override
        public CompletableFuture<List<T>> await() {
            assertTrue(ix == 0, "expected size was 0 got %d", ix);
            return typedEmpty();
        }

        @Override
        public Object awaitPolymorphic() {
            Assert.assertTrue(ix == 0, () -> "expected size was " + 0 + " got " + ix);
            return Collections.emptyList();
        }

        // implementation details: infer the type of Completable<List<T>> from a singleton empty
        private static final CompletableFuture<List<?>> EMPTY = CompletableFuture.completedFuture(Collections.emptyList());

        @SuppressWarnings("unchecked")
        private static <T> CompletableFuture<T> typedEmpty() {
            return (CompletableFuture<T>) EMPTY;
        }
    }

    private static class Single<T> implements CombinedBuilder<T> {

        // avoiding array allocation as there is only 1 CF
        private Object value;
        private int ix;

        @Override
        public void add(CompletableFuture<T> completableFuture) {
            this.value = completableFuture;
            this.ix++;
        }

        @Override
        public void addObject(Object object) {
            this.value = object;
            this.ix++;
        }

        @Override
        public CompletableFuture<List<T>> await() {
            commonSizeAssert();
            if (value instanceof CompletableFuture) {
                @SuppressWarnings("unchecked")
                CompletableFuture<T> cf = (CompletableFuture<T>) value;
                return cf.thenApply(Collections::singletonList);
            }
            //noinspection unchecked
            return CompletableFuture.completedFuture(Collections.singletonList((T) value));
        }

        @Override
        public Object awaitPolymorphic() {
            commonSizeAssert();
            if (value instanceof CompletableFuture) {
                @SuppressWarnings("unchecked")
                CompletableFuture<T> cf = (CompletableFuture<T>) value;
                return cf.thenApply(Collections::singletonList);
            }
            //noinspection unchecked
            return Collections.singletonList((T) value);
        }

        private void commonSizeAssert() {
            Assert.assertTrue(ix == 1, () -> "expected size was " + 1 + " got " + ix);
        }
    }

    private static class Many<T> implements CombinedBuilder<T> {

        private final Object[] array;
        private int ix;
        private int cfCount;

        private Many(int size) {
            this.array = new Object[size];
            this.ix = 0;
            cfCount = 0;
        }

        @Override
        public void add(CompletableFuture<T> completableFuture) {
            array[ix++] = completableFuture;
            cfCount++;
        }

        @Override
        public void addObject(Object object) {
            array[ix++] = object;
            if (object instanceof CompletableFuture) {
                cfCount++;
            }
        }

        @SuppressWarnings("unchecked")
        @Override
        public CompletableFuture<List<T>> await() {
            commonSizeAssert();

            CompletableFuture<List<T>> overallResult = new CompletableFuture<>();
            if (cfCount == 0) {
                overallResult.complete(materialisedList(array));
            } else {
                CompletableFuture<T>[] cfsArr = copyOnlyCFsToArray();
                CompletableFuture.allOf(cfsArr)
                        .whenComplete((ignored, exception) -> {
                            if (exception != null) {
                                overallResult.completeExceptionally(exception);
                                return;
                            }
                            List<T> results = new ArrayList<>(array.length);
                            if (cfsArr.length == array.length) {
                                // they are all CFs
                                for (CompletableFuture<T> cf : cfsArr) {
                                    results.add(cf.join());
                                }
                            } else {
                                // it's a mixed bag of CFs and materialized objects
                                for (Object object : array) {
                                    if (object instanceof CompletableFuture) {
                                        CompletableFuture<T> cf = (CompletableFuture<T>) object;
                                        // join is safe since they are all completed earlier via CompletableFuture.allOf()
                                        results.add(cf.join());
                                    } else {
                                        results.add((T) object);
                                    }
                                }
                            }
                            overallResult.complete(results);
                        });
            }
            return overallResult;
        }

        @SuppressWarnings("unchecked")
        @NonNull
        private CompletableFuture<T>[] copyOnlyCFsToArray() {
            if (cfCount == array.length) {
                // if it's all CFs - make a type safe copy via C code
                return Arrays.copyOf(array, array.length, CompletableFuture[].class);
            } else {
                int i = 0;
                CompletableFuture<T>[] dest = new CompletableFuture[cfCount];
                for (Object o : array) {
                    if (o instanceof CompletableFuture) {
                        dest[i] = (CompletableFuture<T>) o;
                        i++;
                    }
                }
                return dest;
            }
        }

        @Override
        public Object awaitPolymorphic() {
            if (cfCount == 0) {
                commonSizeAssert();
                return materialisedList(array);
            } else {
                return await();
            }
        }

        @NonNull
        private List<T> materialisedList(Object[] array) {
            List<T> results = new ArrayList<>(array.length);
            for (Object object : array) {
                //noinspection unchecked
                results.add((T) object);
            }
            return results;
        }

        private void commonSizeAssert() {
            Assert.assertTrue(ix == array.length, () -> "expected size was " + array.length + " got " + ix);
        }

    }

    @SuppressWarnings("unchecked")
    public static <T, U> CompletableFuture<List<U>> each(Collection<T> list, Function<T, Object> cfOrMaterialisedValueFactory) {
        Object l = eachPolymorphic(list, cfOrMaterialisedValueFactory);
        if (l instanceof CompletableFuture) {
            return (CompletableFuture<List<U>>) l;
        } else {
            return CompletableFuture.completedFuture((List<U>) l);
        }
    }

    /**
     * This will run the value factory for each of the values in the provided list.
     * <p>
     * If any of the values provided is a {@link CompletableFuture} it will return a {@link CompletableFuture} result object
     * that joins on all values otherwise if none of the values are a {@link CompletableFuture} then it will return a materialized list.
     *
     * @param list                         the list to work over
     * @param cfOrMaterialisedValueFactory the value factory to call for each iterm in the list
     * @param <T>                          for two
     *
     * @return a {@link CompletableFuture} to the list of resolved values or the list of values in a materialized fashion
     */
    public static <T> /* CompletableFuture<List<U>> | List<U> */ Object eachPolymorphic(Collection<T> list, Function<T, Object> cfOrMaterialisedValueFactory) {
        CombinedBuilder<Object> futures = ofExpectedSize(list.size());
        for (T t : list) {
            try {
                Object value = cfOrMaterialisedValueFactory.apply(t);
                futures.addObject(value);
            } catch (Exception e) {
                CompletableFuture<Object> cf = new CompletableFuture<>();
                // Async.each makes sure that it is not a CompletionException inside a CompletionException
                cf.completeExceptionally(new CompletionException(e));
                futures.add(cf);
            }
        }
        return futures.awaitPolymorphic();
    }

    public static <T, U> CompletableFuture<List<U>> eachSequentially(Iterable<T> list, BiFunction<T, List<U>, Object> cfOrMaterialisedValueFactory) {
        CompletableFuture<List<U>> result = new CompletableFuture<>();
        eachSequentiallyPolymorphicImpl(list.iterator(), cfOrMaterialisedValueFactory, new ArrayList<>(), result);
        return result;
    }

    @SuppressWarnings("unchecked")
    private static <T, U> void eachSequentiallyPolymorphicImpl(Iterator<T> iterator, BiFunction<T, List<U>, Object> cfOrMaterialisedValueFactory, List<U> tmpResult, CompletableFuture<List<U>> overallResult) {
        if (!iterator.hasNext()) {
            overallResult.complete(tmpResult);
            return;
        }
        Object value;
        try {
            value = cfOrMaterialisedValueFactory.apply(iterator.next(), tmpResult);
        } catch (Exception e) {
            overallResult.completeExceptionally(new CompletionException(e));
            return;
        }
        if (value instanceof CompletableFuture) {
            CompletableFuture<U> cf = (CompletableFuture<U>) value;
            cf.whenComplete((cfResult, exception) -> {
                if (exception != null) {
                    overallResult.completeExceptionally(exception);
                    return;
                }
                tmpResult.add(cfResult);
                eachSequentiallyPolymorphicImpl(iterator, cfOrMaterialisedValueFactory, tmpResult, overallResult);
            });
        } else {
            tmpResult.add((U) value);
            eachSequentiallyPolymorphicImpl(iterator, cfOrMaterialisedValueFactory, tmpResult, overallResult);
        }
    }


    /**
     * Turns an object T into a CompletableFuture if it's not already
     *
     * @param t   - the object to check
     * @param <T> for two
     *
     * @return a CompletableFuture
     */
    @SuppressWarnings("unchecked")
    public static <T> CompletableFuture<T> toCompletableFuture(Object t) {
        if (t instanceof CompletionStage) {
            return ((CompletionStage<T>) t).toCompletableFuture();
        } else {
            return CompletableFuture.completedFuture((T) t);
        }
    }

    /**
     * Turns a CompletionStage into a CompletableFuture if it's not already, otherwise leaves it alone
     * as a materialized object.
     *
     * @param object - the object to check
     *
     * @return a CompletableFuture from a CompletionStage or the materialized object itself
     */
    public static Object toCompletableFutureOrMaterializedObject(Object object) {
        if (object instanceof CompletionStage) {
            return ((CompletionStage<?>) object).toCompletableFuture();
        } else {
            return object;
        }
    }

    public static <T> CompletableFuture<T> tryCatch(Supplier<CompletableFuture<T>> supplier) {
        try {
            return supplier.get();
        } catch (Exception e) {
            CompletableFuture<T> result = new CompletableFuture<>();
            result.completeExceptionally(e);
            return result;
        }
    }

    public static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable exception) {
        CompletableFuture<T> result = new CompletableFuture<>();
        result.completeExceptionally(exception);
        return result;
    }

    /**
     * If the passed in CompletableFuture is null, then it creates a CompletableFuture that resolves to null
     *
     * @param completableFuture the CF to use
     * @param <T>               for two
     *
     * @return the completableFuture if it's not null or one that always resoles to null
     */
    public static <T> @NonNull CompletableFuture<T> orNullCompletedFuture(@Nullable CompletableFuture<T> completableFuture) {
        return completableFuture != null ? completableFuture : CompletableFuture.completedFuture(null);
    }
}