CompletionStageMappingPublisher.java

package graphql.execution.reactive;

import graphql.Internal;
import org.jspecify.annotations.NonNull;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import static graphql.Assert.assertNotNullWithNPE;

/**
 * A reactive Publisher that bridges over another Publisher of `D` and maps the results
 * to type `U` via a CompletionStage, handling errors in that stage
 *
 * @param <D> the downstream type
 * @param <U> the upstream type to be mapped to
 */
@Internal
public class CompletionStageMappingPublisher<D, U> implements Publisher<D> {
    protected final Publisher<U> upstreamPublisher;
    protected final Function<U, CompletionStage<D>> mapper;

    /**
     * You need the following :
     *
     * @param upstreamPublisher an upstream source of data
     * @param mapper            a mapper function that turns upstream data into a promise of mapped D downstream data
     */
    public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function<U, CompletionStage<D>> mapper) {
        this.upstreamPublisher = upstreamPublisher;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(Subscriber<? super D> downstreamSubscriber) {
        assertNotNullWithNPE(downstreamSubscriber, () -> "Subscriber passed to subscribe must not be null");
        upstreamPublisher.subscribe(createSubscriber(downstreamSubscriber));
    }

    @NonNull
    protected Subscriber<? super U> createSubscriber(Subscriber<? super D> downstreamSubscriber) {
        return new CompletionStageSubscriber<>(mapper, downstreamSubscriber);
    }


    /**
     * Get instance of an upstreamPublisher
     *
     * @return upstream instance of {@link Publisher}
     */
    public Publisher<U> getUpstreamPublisher() {
        return upstreamPublisher;
    }

}