ReactiveSupport.java

package graphql.execution.reactive;

import graphql.DuckTyped;
import graphql.Internal;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;

/**
 * This provides support for a DataFetcher to be able to
 * return a reactive streams {@link Publisher} or Java JDK {@link Flow.Publisher}
 * as a value, and it can be turned into a {@link CompletableFuture}
 * that we can get an async value from.
 */
@Internal
public class ReactiveSupport {

    @DuckTyped(shape = "CompletableFuture | Object")
    public static Object fetchedObject(Object fetchedObject) {
        if (fetchedObject instanceof Flow.Publisher) {
            return flowPublisherToCF((Flow.Publisher<?>) fetchedObject);
        }
        if (fetchedObject instanceof Publisher) {
            return flowPublisherToCF(FlowAdapters.toFlowPublisher((Publisher<?>) fetchedObject));
        }
        return fetchedObject;
    }

    private static CompletableFuture<Object> flowPublisherToCF(Flow.Publisher<?> publisher) {
        FlowPublisherToCompletableFuture<Object> cf = new FlowPublisherToCompletableFuture<>();
        publisher.subscribe(cf);
        return cf;
    }

    /**
     * The implementations between reactive Publishers and Flow.Publishers are almost exactly the same except the
     * subscription class is different.  So this is a common class that contains most of the common logic
     *
     * @param <T> for two
     * @param <S> for subscription
     */
    private static abstract class PublisherToCompletableFuture<T, S> extends CompletableFuture<T> {

        private final AtomicReference<S> subscriptionRef = new AtomicReference<>();

        abstract void doSubscriptionCancel(S s);

        @SuppressWarnings("SameParameterValue")
        abstract void doSubscriptionRequest(S s, long n);

        private boolean validateSubscription(S current, S next) {
            Objects.requireNonNull(next, "Subscription cannot be null");
            if (current != null) {
                doSubscriptionCancel(next);
                return false;
            }
            return true;
        }

        /**
         * This overrides the {@link CompletableFuture#cancel(boolean)} method
         * such that subscription is also cancelled.
         *
         * @param mayInterruptIfRunning this value has no effect in this
         *                              implementation because interrupts are not used to control
         *                              processing.
         * @return a boolean if it was cancelled
         */
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled) {
                S s = subscriptionRef.getAndSet(null);
                if (s != null) {
                    doSubscriptionCancel(s);
                }
            }
            return cancelled;
        }

        void onSubscribeImpl(S s) {
            if (validateSubscription(subscriptionRef.getAndSet(s), s)) {
                doSubscriptionRequest(s, Long.MAX_VALUE);
            }
        }

        void onNextImpl(T t) {
            S s = subscriptionRef.getAndSet(null);
            if (s != null) {
                complete(t);
                doSubscriptionCancel(s);
            }
        }

        void onErrorImpl(Throwable t) {
            if (subscriptionRef.getAndSet(null) != null) {
                completeExceptionally(t);
            }
        }

        void onCompleteImpl() {
            if (subscriptionRef.getAndSet(null) != null) {
                complete(null);
            }
        }
    }

    private static class FlowPublisherToCompletableFuture<T> extends PublisherToCompletableFuture<T, Flow.Subscription> implements Flow.Subscriber<T> {

        @Override
        void doSubscriptionCancel(Flow.Subscription subscription) {
            subscription.cancel();
        }

        @Override
        void doSubscriptionRequest(Flow.Subscription subscription, long n) {
            subscription.request(n);
        }

        @Override
        public void onSubscribe(Flow.Subscription s) {
            onSubscribeImpl(s);
        }

        @Override
        public void onNext(T t) {
            onNextImpl(t);
        }

        @Override
        public void onError(Throwable t) {
            onErrorImpl(t);
        }

        @Override
        public void onComplete() {
            onCompleteImpl();
        }
    }
}