DelayedSubscriber.java

/*
 * Copyright 2017-2024 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.core.async.publisher;

import io.micronaut.core.annotation.Internal;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
 * This is a {@link Processor} that does not change the stream, but allows the upstream
 * {@link org.reactivestreams.Publisher} and downstream {@link Subscriber} to be set independently
 * in any order. If the upstream is set first, this class makes sure that any early completion
 * is held back until the downstream has finished {@code onSubscribe}. If the downstream is set
 * first, this class makes sure any demand is stored until the upstream becomes available.
 *
 * @param <T> The forwarded type
 * @author Jonas Konrad
 * @since 4.4.0
 */
@Internal
public final class DelayedSubscriber<T> implements Processor<T, T>, Subscription {
    private static final Object COMPLETE = new Object();

    private boolean wip;

    private Subscription upstream;
    private Subscriber<? super T> downstream;
    private Object completion;
    private long demand;
    private boolean cancel;

    @Override
    public void subscribe(Subscriber<? super T> s) {
        s.onSubscribe(this);
        synchronized (this) {
            this.downstream = s;
        }
        work();
    }

    @Override
    public void onSubscribe(Subscription s) {
        synchronized (this) {
            this.upstream = s;
        }
        work();
    }

    @Override
    public void onNext(T t) {
        Subscriber<? super T> downstream = this.downstream;
        if (downstream == null) {
            throw new IllegalStateException("onNext before legitimate request");
        }
        downstream.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        synchronized (this) {
            completion = t;
        }
        work();
    }

    @Override
    public void onComplete() {
        synchronized (this) {
            completion = COMPLETE;
        }
        work();
    }

    @Override
    public void request(long n) {
        synchronized (this) {
            demand = Math.max(demand + n, demand);
        }
        work();
    }

    @Override
    public void cancel() {
        synchronized (this) {
            cancel = true;
        }
        work();
    }

    private void work() {
        boolean holdingWip = false;
        while (true) {
            Object completion = null;
            boolean cancel = false;
            long demand = 0;
            synchronized (this) {
                if (!holdingWip) {
                    if (wip) {
                        return;
                    }
                    wip = true;
                    holdingWip = true;
                }

                if (this.completion != null && downstream != null) {
                    completion = this.completion;
                    this.completion = null;
                } else if (this.demand != 0 && upstream != null && downstream != null) {
                    demand = this.demand;
                    this.demand = 0;
                } else if (this.cancel && upstream != null) {
                    cancel = true;
                    this.cancel = false;
                } else {
                    // nothing to do
                    wip = false;
                    return;
                }
            }

            if (completion != null) {
                if (completion == COMPLETE) {
                    downstream.onComplete();
                } else {
                    downstream.onError((Throwable) completion);
                }
            } else if (demand != 0) {
                upstream.request(demand);
            } else {
                assert cancel;
                upstream.cancel();
            }
        }
    }
}