CompletionStageOrderedSubscriber.java
package graphql.execution.reactive;
import graphql.Internal;
import org.reactivestreams.Subscriber;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
/**
* This subscriber can be used to map between a {@link org.reactivestreams.Publisher} of U
* elements and map them into {@link CompletionStage} of D promises, and it keeps them in the order
* the Publisher provided them.
*
* @param <U> published upstream elements
* @param <D> mapped downstream values
*/
@Internal
public class CompletionStageOrderedSubscriber<U, D> extends CompletionStageSubscriber<U, D> implements Subscriber<U> {
public CompletionStageOrderedSubscriber(Function<U, CompletionStage<D>> mapper, Subscriber<? super D> downstreamSubscriber) {
super(mapper, downstreamSubscriber);
}
@Override
protected void whenNextFinished(CompletionStage<D> completionStage, D d, Throwable throwable) {
try {
if (throwable != null) {
handleThrowableDuringMapping(throwable);
} else {
emptyInFlightQueueIfWeCan();
}
} finally {
boolean empty = inFlightQIsEmpty();
finallyAfterEachPromiseFinishes(empty);
}
}
private void emptyInFlightQueueIfWeCan() {
// done inside a memory lock, so we cant offer new CFs to the queue
// until we have processed any completed ones from the start of
// the queue.
lock.runLocked(() -> {
//
// from the top of the in flight queue, take all the CFs that have
// completed... but stop if they are not done
while (!inFlightDataQ.isEmpty()) {
CompletionStage<?> cs = inFlightDataQ.peek();
if (cs != null) {
//
CompletableFuture<?> cf = cs.toCompletableFuture();
if (cf.isDone()) {
// take it off the queue
inFlightDataQ.poll();
D value;
try {
//noinspection unchecked
value = (D) cf.join();
} catch (RuntimeException rte) {
//
// if we get an exception while joining on a value, we
// send it into the exception handling and break out
handleThrowableDuringMapping(cfExceptionUnwrap(rte));
break;
}
downstreamSubscriber.onNext(value);
} else {
// if the CF is not done, then we have to stop processing
// to keep the results in order inside the inFlightQueue
break;
}
}
}
});
}
private Throwable cfExceptionUnwrap(Throwable throwable) {
if (throwable instanceof CompletionException & throwable.getCause() != null) {
return throwable.getCause();
}
return throwable;
}
}