DelayedExecutionFlowImpl.java

/*
 * Copyright 2017-2023 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.core.execution;

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

@SuppressWarnings("rawtypes")
final class DelayedExecutionFlowImpl<T> implements DelayedExecutionFlow<T> {
    private static final Logger LOG = LoggerFactory.getLogger(DelayedExecutionFlowImpl.class);

    /**
     * The head of the linked list of steps in this flow.
     */
    private Head head = new Head();
    /**
     * The tail of the linked list of steps in this flow.
     */
    private Step tail = head;
    private Runnable onCancel;
    private volatile boolean cancelled;

    /**
     * Perform the given step with the given item. Continue on until there is either no more steps,
     * either because onComplete was hit or because the consumer is not finished adding all the
     * steps, or until a step does not finish immediately, e.g. flatMap returning a non-immediate
     * flow.
     *
     * @param step The step to execute first
     * @param executionFlow The previous execution flow
     */
    private static void work(Step step, ExecutionFlow<Object> executionFlow) {
        do {
            executionFlow = step.apply(executionFlow);
            step = step.atomicSetOutput(executionFlow);
        } while (step != null);
    }

    @Override
    public void completeFrom(@NonNull ExecutionFlow<T> flow) {
        flow.onComplete(this::complete);
    }

    /**
     * Complete with initial execution flow.
     *
     * @param executionFlow The execution flow
     */
    private void complete0(@NonNull ExecutionFlow<Object> executionFlow) {
        if (head == null) {
            throw new IllegalStateException("Delayed flow has been completed");
        }
        Step immediateStep = head.atomicSetOutput(executionFlow);
        if (immediateStep != null) {
            work(immediateStep, executionFlow);
        }
        head = null;
    }

    @Override
    public void complete(T result) {
        complete0(result == null ? ExecutionFlow.empty() : ExecutionFlow.just(result));
    }

    @Override
    public void completeExceptionally(Throwable exc) {
        complete0(ExecutionFlow.error(exc));
    }

    /**
     * Add a new step to this flow.
     *
     * @param next The new step
     * @param <R> The return type of the flow for generics support
     * @return This flow
     */
    @SuppressWarnings("unchecked")
    private <R> ExecutionFlow<R> next(Step next) {
        Step oldTail = tail;
        if (oldTail instanceof DelayedExecutionFlowImpl.Cancel<?>) {
            // because the Cancel step can only cancel flows upstream of it, we can't allow adding
            // further downstream steps.
            throw new IllegalStateException("Cannot add more ExecutionFlow steps after cancellation");
        }
        tail = next;
        ExecutionFlow output = oldTail.atomicSetNext(next);
        if (output != null) {
            work(next, output);
        }
        return (ExecutionFlow<R>) this;
    }

    @Override
    public <R> ExecutionFlow<R> map(Function<? super T, ? extends R> transformer) {
        return next(new Map<>(transformer));
    }

    @Override
    public <R> ExecutionFlow<R> flatMap(Function<? super T, ? extends ExecutionFlow<? extends R>> transformer) {
        return next(new FlatMap<>(transformer));
    }

    @Override
    public <R> ExecutionFlow<R> then(Supplier<? extends ExecutionFlow<? extends R>> supplier) {
        return next(new Then<>(supplier));
    }

    @Override
    public ExecutionFlow<T> onErrorResume(Function<? super Throwable, ? extends ExecutionFlow<? extends T>> fallback) {
        return next(new OnErrorResume<>(fallback));
    }

    @Override
    public ExecutionFlow<T> putInContext(String key, Object value) {
        return this;
    }

    @Override
    public void onComplete(BiConsumer<? super T, Throwable> fn) {
        next(new OnComplete<>(fn));
    }

    @Override
    public void completeTo(CompletableFuture<T> completableFuture) {
        next(new OnCompleteToFuture<>(completableFuture));
    }

    @SuppressWarnings("unchecked")
    @Nullable
    @Override
    public ImperativeExecutionFlow<T> tryComplete() {
        ExecutionFlow tailOutput = tail.output;
        if (tailOutput != null) {
            return tailOutput.tryComplete();
        } else {
            return null;
        }
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public void cancel() {
        if (cancelled) {
            return;
        }
        next(new Cancel());
        cancelled = true;
        Runnable hook = this.onCancel;
        if (hook != null) {
            hook.run();
        }
    }

    @Override
    public void onCancel(Runnable hook) {
        Runnable prev = this.onCancel;
        if (prev != null) {
            this.onCancel = () -> {
                prev.run();
                hook.run();
            };
        } else {
            this.onCancel = hook;
        }
    }

    private abstract static sealed class Step<I, O> {
        /**
         * The next step to take, or {@code null} if there is no next step yet.
         */
        private volatile Step next;
        /**
         * The output of this step, or {@code null} if this step has not completed yet.
         */
        private volatile ExecutionFlow<Object> output;

        /**
         * Apply this step.
         *
         * @param input The input for the step
         * @return The return value of the {@code return*} method called
         */
        abstract ExecutionFlow<O> apply(ExecutionFlow<I> input);

        /**
         * Atomically set the output of this step. If this returns non-null, the caller must call
         * {@link #work(Step, ExecutionFlow)} with the returned step.
         *
         * @param output The output of this step
         * @return The next step to execute using {@link #work(Step, ExecutionFlow)}, or {@code null} if
         * the next step will be executed later
         */
        @Nullable
        final Step atomicSetOutput(ExecutionFlow<Object> output) {
            if (this.output != null) {
                // this is a best-effort check, the output field isn't always set
                throw new IllegalStateException("Already completed");
            }
            Step next = this.next;
            if (next != null) {
                return next;
            }
            this.output = output;
            next = this.next;
            if (next != null) {
                // another thread completed at the same time! one or both threads hit this sync
                // block.
                synchronized (this) {
                    // deconfliction path
                    next = this.next;
                    if (next != null) {
                        // our sync block was executed first, unset output so the other thread aborts
                        this.output = null;
                        return next;
                    }
                }
            }
            // no next step yet.
            return null;
        }

        /**
         * Atomically set the next step. If this returns non-null, the caller must call
         * {@link #work(Step, ExecutionFlow)} with the returned output value.
         *
         * @param next The next step to execute
         * @return The output flow value of this step, to be passed to {@link #work(Step, ExecutionFlow)}
         */
        @Nullable
        final ExecutionFlow<Object> atomicSetNext(Step next) {
            if (this.next != null) {
                // this is a best-effort check, the next field isn't always set
                throw new IllegalStateException("Already added a next step");
            }
            ExecutionFlow<Object> output = this.output;
            if (output != null) {
                return output;
            }
            this.next = next;
            output = this.output;
            if (output != null) {
                // another thread completed at the same time! one or both threads hit this sync
                // block.
                synchronized (this) {
                    // deconfliction path
                    output = this.output;
                    if (output != null) {
                        // our sync block was executed first, unset next so the other thread aborts
                        this.next = null;
                        return output;
                    }
                }
            }
            // no output yet.
            return null;
        }

        /**
         * Return an immediate failed value from this step (e.g. from map).
         *
         * @param e The exception to return
         * @return The value to return from {@link #work}
         */
        final <O> ExecutionFlow<O> returnError(Throwable e) {
            return ExecutionFlow.error(e);
        }
    }

    /**
     * Mock step used as the head of the linked list of steps.
     */
    private static final class Head extends Step<Object, Object> {

        @Override
        ExecutionFlow<Object> apply(ExecutionFlow<Object> input) {
            throw new UnsupportedOperationException();
        }

    }

    private static final class Map<I, O> extends Step<I, O> {
        private final Function<? super I, ? extends O> transformer;

        private Map(Function<? super I, ? extends O> transformer) {
            this.transformer = transformer;
        }

        @Override
        ExecutionFlow<O> apply(ExecutionFlow<I> executionFlow) {
            try {
                return executionFlow.map(transformer);
            } catch (Exception e) {
                return returnError(e);
            }
        }
    }

    private static final class FlatMap<I, O>  extends Step<I, O> {
        private final Function<? super I, ? extends ExecutionFlow<? extends O>> transformer;

        private FlatMap(Function<? super I, ? extends ExecutionFlow<? extends O>> transformer) {
            this.transformer = transformer;
        }

        @Override
        ExecutionFlow<O> apply(ExecutionFlow<I> executionFlow) {
            try {
                return executionFlow.flatMap(transformer);
            } catch (Exception e) {
                return returnError(e);
            }
        }
    }

    private static final class Then<I, O> extends Step<I, O> {
        private final Supplier<? extends ExecutionFlow<? extends O>> transformer;

        private Then(Supplier<? extends ExecutionFlow<? extends O>> transformer) {
            this.transformer = transformer;
        }

        @Override
        ExecutionFlow<O> apply(ExecutionFlow<I> executionFlow) {
            try {
                return executionFlow.then(transformer);
            } catch (Exception e) {
                return returnError(e);
            }
        }
    }

    private static final class OnErrorResume<I> extends Step<I, I> {
        private final Function<? super Throwable, ? extends ExecutionFlow<? extends I>> fallback;

        private OnErrorResume(Function<? super Throwable, ? extends ExecutionFlow<? extends I>> fallback) {
            this.fallback = fallback;
        }

        @Override
        ExecutionFlow<I> apply(ExecutionFlow<I> executionFlow) {
            try {
                return executionFlow.onErrorResume(fallback);
            } catch (Exception e) {
                return returnError(e);
            }
        }
    }

    private static final class OnComplete<E> extends Step<E, E> {
        private final BiConsumer<? super E, Throwable> consumer;

        public OnComplete(BiConsumer<? super E, Throwable> consumer) {
            this.consumer = consumer;
        }

        @Override
        ExecutionFlow<E> apply(ExecutionFlow<E> executionFlow) {
            try {
                executionFlow.onComplete(consumer);
            } catch (Exception e) {
                LOG.error("Failed to execute onComplete", e);
            }
            return executionFlow;
        }
    }

    private static final class OnCompleteToFuture<E> extends Step<E, E> {
        private final CompletableFuture<E> future;

        public OnCompleteToFuture(CompletableFuture<E> future) {
            this.future = future;
        }

        @Override
        ExecutionFlow<E> apply(ExecutionFlow<E> executionFlow) {
            try {
                executionFlow.completeTo(future);
            } catch (Exception e) {
                LOG.error("Failed to execute onComplete", e);
            }
            return executionFlow;
        }
    }

    private static final class Cancel<E> extends Step<E, E> {
        private static final ExecutionFlow ERR = ExecutionFlow.error(new AssertionError("Should never be hit, no further steps are allowed after cancel"));

        @Override
        ExecutionFlow<E> apply(ExecutionFlow<E> input) {
            input.cancel();
            return ERR;
        }
    }
}