LazySendingSubscriber.java

/*
 * Copyright 2017-2024 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.core.async.subscriber;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;

/**
 * This class waits for the first item of a publisher before completing an ExecutionFlow with a
 * publisher containing the same items.
 *
 * @param <T> The publisher item type
 * @since 4.8.0
 * @author Jonas Konrad
 */
@Internal
public final class LazySendingSubscriber<T> implements CoreSubscriber<T>, CorePublisher<T>, Subscription {
    private final DelayedExecutionFlow<Publisher<T>> result = DelayedExecutionFlow.create();
    private boolean receivedFirst = false;
    private volatile boolean sentFirst = false;
    private boolean sendingFirst = false;
    private T first;
    private Subscription upstream;
    private volatile CoreSubscriber<? super T> downstream;
    private Signal<? extends T> heldBackSignal;
    private long heldBackDemand = 0;

    private LazySendingSubscriber() {
    }

    /**
     * Create an {@link ExecutionFlow} that waits for the first item of the given publisher. If
     * there is an error before the first item, the flow will fail. If there is no error, the flow
     * will complete with a publisher containing all items, including the first one.
     *
     * @param input The input stream
     * @return A flow that will complete with the same stream
     * @param <T> The item type
     */
    @NonNull
    public static <T> ExecutionFlow<Publisher<T>> create(@NonNull Publisher<T> input) {
        LazySendingSubscriber<T> subscriber = new LazySendingSubscriber<>();
        input.subscribe(subscriber);
        return subscriber.result;
    }

    @Override
    public Context currentContext() {
        return downstream == null ? Context.empty() : downstream.currentContext();
    }

    @Override
    public void onSubscribe(Subscription s) {
        upstream = s;
        s.request(1);
    }

    @Override
    public void onNext(T t) {
        if (!receivedFirst) {
            receivedFirst = true;
            first = t;
            result.complete(this);
        } else {
            downstream.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (receivedFirst) {
            Subscriber<? super T> d;
            synchronized (this) {
                d = downstream;
                if (d == null || !sentFirst) {
                    heldBackSignal = Signal.error(t);
                    return;
                }
            }
            d.onError(t);
        } else {
            receivedFirst = true;
            result.completeExceptionally(t);
        }
    }

    @Override
    public void onComplete() {
        if (!receivedFirst) {
            onNext(null);
        }

        Subscriber<? super T> d;
        synchronized (this) {
            d = downstream;
            if (d == null || !sentFirst) {
                heldBackSignal = Signal.complete();
                return;
            }
        }
        d.onComplete();
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> subscriber) {
        synchronized (this) {
            downstream = subscriber;
        }
        subscriber.onSubscribe(this);
    }

    @Override
    public void subscribe(Subscriber<? super T> s) {
        subscribe(Operators.toCoreSubscriber(s));
    }

    private static long saturatingAdd(long a, long b) {
        long sum = a + b;
        if (sum < a) {
            return Long.MAX_VALUE;
        }
        return sum;
    }

    @Override
    public void request(long n) {
        if (!sentFirst) {
            if (sendingFirst) {
                // we're currently running onNext, need to wait with the request() call.
                synchronized (this) {
                    if (!sentFirst) {
                        // hold back demand until onNext is done
                        heldBackDemand = saturatingAdd(heldBackDemand, n);
                        return;
                    }
                }
                // sentFirst became true
                upstream.request(n);
                return;
            }
            sendingFirst = true;
            if (first != null) {
                downstream.onNext(first); // note: this can trigger reentrancy!
                first = null;
            }
            Signal<? extends T> heldBackSignal;
            synchronized (this) {
                sentFirst = true;
                heldBackSignal = this.heldBackSignal;
                n = saturatingAdd(n, heldBackDemand);
            }
            if (heldBackSignal != null) {
                heldBackSignal.accept(downstream);
                return;
            }
            n--;
            if (n <= 0) {
                return;
            }
        }

        upstream.request(n);
    }

    @Override
    public void cancel() {
        if (!sentFirst) {
            sentFirst = true;
            T t = first;
            first = null;
            Operators.onNextDropped(t, currentContext());
        }
        upstream.cancel();
    }
}