SubscriptionReproduction.java

package reproductions;

import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.execution.SubscriptionExecutionStrategy;
import graphql.schema.DataFetchingEnvironment;
import graphql.schema.GraphQLSchema;
import graphql.schema.idl.RuntimeWiring;
import graphql.schema.idl.SchemaGenerator;
import graphql.schema.idl.SchemaParser;
import org.jspecify.annotations.NonNull;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static graphql.schema.idl.RuntimeWiring.newRuntimeWiring;
import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring;

/**
 * Related to <a href="https://github.com/spring-projects/spring-graphql/issues/949">...</a>
 * <p>
 * This reproduction is to see what's happening with Subscriptions and whether they keep their
 * order when values are async.
 */
public class SubscriptionReproduction {
    public static void main(String[] args) {
        new SubscriptionReproduction().run(args);
    }

    private void run(String[] args) {

        boolean ordered = args.length > 0 && "ordered".equals(args[0]);

        GraphQL graphQL = mkGraphQl();
        String query = "subscription MySubscription {\n" +
                "  searchVideo {\n" +
                "    id\n" +
                "    name\n" +
                "    lastEpisode\n" +
                "    isFavorite\n" +
                "  }\n" +
                "}";

        ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(query).graphQLContext(
                b -> b.put(SubscriptionExecutionStrategy.KEEP_SUBSCRIPTION_EVENTS_ORDERED, ordered)
        ).build();
        ExecutionResult executionResult = graphQL.execute(executionInput);
        Publisher<Map<String, Object>> publisher = executionResult.getData();

        DeathEater eater = new DeathEater();
        eater.eat(publisher);
    }

    private GraphQL mkGraphQl() {
        String sdl = "type Query { f : ID }" +
                "type Subscription {" +
                "   searchVideo : VideoSearch" +
                "}" +
                "type VideoSearch {" +
                "       id : ID" +
                "       name : String" +
                "       lastEpisode : String" +
                "       isFavorite : Boolean" +
                "}";
        RuntimeWiring runtimeWiring = newRuntimeWiring()
                .type(newTypeWiring("Subscription")
                        .dataFetcher("searchVideo", this::mkFluxDF)
                )
                .type(newTypeWiring("VideoSearch")
                        .dataFetcher("name", this::nameDF)
                        .dataFetcher("isFavorite", this::isFavoriteDF)
                        .dataFetcher("lastEpisode", this::lastEpisode)
                )
                .build();

        GraphQLSchema schema = new SchemaGenerator().makeExecutableSchema(
                new SchemaParser().parse(sdl), runtimeWiring
        );
        return GraphQL.newGraphQL(schema).build();
    }

    private CompletableFuture<Flux<Object>> mkFluxDF(DataFetchingEnvironment env) {
        // async deliver of the publisher with random snoozing between values
        Supplier<Flux<Object>> fluxSupplier = () -> Flux.generate(() -> 0, (counter, sink) -> {
            sink.next(mkValue(counter));
            snooze(rand(10, 100));
            if (counter == 10) {
                sink.complete();
            }
            return counter + 1;
        });
        return CompletableFuture.supplyAsync(fluxSupplier);
    }

    private Object isFavoriteDF(DataFetchingEnvironment env) {
        // async deliver of the isFavorite property with random delay
        return CompletableFuture.supplyAsync(() -> {
            Integer counter = getCounter(env.getSource());
            return counter % 2 == 0;
        });
    }

    private Object lastEpisode(DataFetchingEnvironment env) {
        // Mono-based async property that uses CF as the interface
        return Mono.fromCallable(() -> {
                    Integer counter = getCounter(env.getSource());
                    return "episode-" + Thread.currentThread().getName() + "for" + counter;
                })
                .publishOn(Schedulers.boundedElastic())
                .toFuture();
    }

    private Object nameDF(DataFetchingEnvironment env) {
        // async deliver of the isFavorite property with random delay
        return CompletableFuture.supplyAsync(() -> {
            Integer counter = getCounter(env.getSource());
            return "name" + counter;
        });
    }

    private static Integer getCounter(Map<String, Object> video) {
        Integer counter = (Integer) video.getOrDefault("counter", 0);
        snooze(rand(100, 500));
        return counter;
    }

    private @NonNull Object mkValue(Integer counter) {
        // name and isFavorite are future values via DFs
        return Map.of(
                "counter", counter,
                "id", String.valueOf(counter) // immediate value
        );
    }


    private static void snooze(int ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    static Random rn = new Random();

    private static int rand(int min, int max) {
        return rn.nextInt(max - min + 1) + min;
    }

    public static class DeathEater implements Subscriber<Object> {
        private Subscription subscription;
        private final AtomicBoolean done = new AtomicBoolean();

        public boolean isDone() {
            return done.get();
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            System.out.println("onSubscribe");
            subscription.request(10);
        }

        @Override
        public void onNext(Object o) {
            System.out.println("\tonNext : " + o);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError");
            throwable.printStackTrace(System.err);
            done.set(true);
        }

        @Override
        public void onComplete() {
            System.out.println("complete");
            done.set(true);
        }

        public void eat(Publisher<?> publisher) {
            publisher.subscribe(this);
            while (!this.isDone()) {
                snooze(2);
            }

        }
    }
}