ExecutionFlow.java

/*
 * Copyright 2017-2022 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.core.execution;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * The execution flow class represents a data flow which state can be represented as a simple imperative flow or an async/reactive.
 * The state can be resolved or lazy - based on the implementation.
 * NOTE: The instance of the flow is not supposed to be used after a mapping operator is used.
 *
 * @param <T> The flow type
 * @author Denis Stepanov
 * @since 4.0.0
 */
@Internal
public interface ExecutionFlow<T> {

    /**
     * Create a simple flow representing a value.
     *
     * @param value The value
     * @param <K>   The value type
     * @return a new flow
     */
    @NonNull
    static <K> ExecutionFlow<K> just(@Nullable K value) {
        return (ExecutionFlow<K>) new ImperativeExecutionFlowImpl(value, null);
    }

    /**
     * Create a simple flow representing an error.
     *
     * @param e   The exception
     * @param <K> The value type
     * @return a new flow
     */
    @NonNull
    static <K> ExecutionFlow<K> error(@NonNull Throwable e) {
        return (ExecutionFlow<K>) new ImperativeExecutionFlowImpl(null, e);
    }

    /**
     * Create a simple flow representing an empty state.
     *
     * @param <T>      The flow value type
     * @return a new flow
     */
    @NonNull
    static <T> ExecutionFlow<T> empty() {
        return (ExecutionFlow<T>) new ImperativeExecutionFlowImpl(null, null);
    }

    /**
     * Create a flow by invoking a supplier asynchronously.
     *
     * @param executor The executor
     * @param supplier The supplier
     * @param <T>      The flow value type
     * @return a new flow
     */
    @NonNull
    static <T> ExecutionFlow<T> async(@NonNull Executor executor, @NonNull Supplier<? extends ExecutionFlow<T>> supplier) {
        DelayedExecutionFlow<T> completableFuture = DelayedExecutionFlow.create();
        executor.execute(() -> supplier.get().onComplete(completableFuture::complete));
        return completableFuture;
    }

    /**
     * Map a not-empty value.
     *
     * @param transformer The value transformer
     * @param <R>         New value Type
     * @return a new flow
     */
    @NonNull
    <R> ExecutionFlow<R> map(@NonNull Function<? super T, ? extends R> transformer);

    /**
     * Map a not-empty value to a new flow.
     *
     * @param transformer The value transformer
     * @param <R>         New value Type
     * @return a new flow
     */
    @NonNull
    <R> ExecutionFlow<R> flatMap(@NonNull Function<? super T, ? extends ExecutionFlow<? extends R>> transformer);

    /**
     * Supply a new flow after the existing flow value is resolved.
     *
     * @param supplier The supplier
     * @param <R>      New value Type
     * @return a new flow
     */
    @NonNull
    <R> ExecutionFlow<R> then(@NonNull Supplier<? extends ExecutionFlow<? extends R>> supplier);

    /**
     * Supply a new flow if the existing flow is erroneous.
     *
     * @param fallback The fallback
     * @return a new flow
     */
    @NonNull
    ExecutionFlow<T> onErrorResume(@NonNull Function<? super Throwable, ? extends ExecutionFlow<? extends T>> fallback);

    /**
     * Store a contextual value.
     *
     * @param key   The key
     * @param value The value
     * @return a new flow
     */
    @NonNull
    ExecutionFlow<T> putInContext(@NonNull String key, @NonNull Object value);

    /**
     * Store a contextual value if it is absent.
     *
     * @param key   The key
     * @param value The value
     * @return a new flow
     * @since 4.8.0
     */
    @NonNull
    default ExecutionFlow<T> putInContextIfAbsent(@NonNull String key, @NonNull Object value) {
        return this;
    }

    /**
     * Invokes a provided function when the flow is resolved, or immediately if it is already done.
     *
     * @param fn The function
     */
    void onComplete(@NonNull BiConsumer<? super T, Throwable> fn);

    /**
     * Completes the flow to the completable future.
     *
     * @param completableFuture The completable future
     * @since 4.8
     */
    void completeTo(@NonNull CompletableFuture<T> completableFuture);

    /**
     * Create a new {@link ExecutionFlow} that either returns the same result or, if the timeout
     * expires before the result is received, a {@link java.util.concurrent.TimeoutException}.
     *
     * @param timeout   The timeout
     * @param scheduler Scheduler to schedule the timeout task
     * @param onDiscard An optional consumer to be called on the value of this flow if the flow
     *                  completes after the timeout has expired and thus the value is discarded
     * @return A new flow that will produce either the same value or a {@link java.util.concurrent.TimeoutException}
     */
    @NonNull
    default ExecutionFlow<T> timeout(@NonNull Duration timeout, @NonNull ScheduledExecutorService scheduler, @Nullable BiConsumer<T, Throwable> onDiscard) {
        DelayedExecutionFlow<T> delayed = DelayedExecutionFlow.create();
        AtomicBoolean completed = new AtomicBoolean(false);
        // schedule the timeout
        ScheduledFuture<?> future = scheduler.schedule(() -> {
            if (completed.compareAndSet(false, true)) {
                cancel();
                delayed.completeExceptionally(new TimeoutException());
            }
        }, timeout.toNanos(), TimeUnit.NANOSECONDS);
        // forward any result
        onComplete((t, throwable) -> {
            if (completed.compareAndSet(false, true)) {
                future.cancel(false);
                if (throwable != null) {
                    delayed.completeExceptionally(throwable);
                } else {
                    delayed.complete(t);
                }
            } else {
                if (onDiscard != null) {
                    onDiscard.accept(t, throwable);
                }
            }
        });
        // forward cancel from downstream
        delayed.onCancel(this::cancel);
        return delayed;
    }

    /**
     * Create an {@link ImperativeExecutionFlow} from this execution flow, if possible. The flow
     * will have its result immediately available.
     *
     * @return The imperative flow, or {@code null} if this flow is not complete or does not
     * support this operation
     */
    @Nullable
    ImperativeExecutionFlow<T> tryComplete();

    /**
     * Alternative to {@link #tryComplete()} which will unwrap the flow's value.
     *
     * @return The imperative flow then returns its value, or {@code null} if this flow is not complete or does not
     * support this operation
     * @since 4.3
     */
    @Nullable
    default T tryCompleteValue() {
        ImperativeExecutionFlow<T> imperativeFlow = tryComplete();
        if (imperativeFlow != null) {
            return imperativeFlow.getValue();
        }
        return null;
    }

    /**
     * Alternative to {@link #tryComplete()} which will unwrap the flow's error.
     *
     * @return The imperative flow then returns its error, or {@code null} if this flow is not complete or does not
     * support this operation
     * @since 4.3
     */
    @Nullable
    default Throwable tryCompleteError() {
        ImperativeExecutionFlow<T> imperativeFlow = tryComplete();
        if (imperativeFlow != null) {
            return imperativeFlow.getError();
        }
        return null;
    }

    /**
     * Converts the existing flow into the completable future.
     *
     * @return a {@link CompletableFuture} that represents the state if this flow.
     */
    @NonNull
    default CompletableFuture<T> toCompletableFuture() {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completeTo(completableFuture);
        return completableFuture;
    }

    /**
     * Send an optional hint to the upstream producer that the result of this flow is no longer
     * needed and can be discarded. This is an optional operation, and has no effect if the flow
     * has already completed. After a cancellation, a flow might never complete.
     * <p>If this flow contains a resource that needs to be cleaned up (e.g. an
     * {@link java.io.InputStream}), the caller should still add a
     * {@link #onComplete completion listener} for cleanup, in case the upstream producer does not
     * support cancellation or has already submitted the result.
     *
     * @since 4.8.0
     */
    default void cancel() {
    }
}