DefaultEventSource.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.sse.impl;

import static org.apache.hc.core5.http.ContentType.TEXT_EVENT_STREAM;

import java.io.InterruptedIOException;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.sse.BackoffStrategy;
import org.apache.hc.client5.http.sse.EventSource;
import org.apache.hc.client5.http.sse.EventSourceConfig;
import org.apache.hc.client5.http.sse.EventSourceListener;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.RequestNotExecutedException;
import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Default {@link EventSource} implementation that manages the SSE connection lifecycle:
 * establishing the connection, parsing events, handling failures, and performing
 * bounded, policy-driven reconnects.
 *
 * <p>Key responsibilities:</p>
 * <ul>
 *   <li>Builds and executes an HTTP GET with {@code Accept: text/event-stream}.</li>
 *   <li>Parses SSE using either a char-based or byte-based parser as configured
 *       by {@link SseParser}.</li>
 *   <li>Tracks {@code Last-Event-ID} and forwards events to the user listener
 *       on a caller-provided or inline executor.</li>
 *   <li>Applies {@link BackoffStrategy} with optional server-provided hints
 *       ({@code retry:} field and {@code Retry-After} header) to schedule reconnects.</li>
 *   <li>Honors a maximum reconnect count and emits {@link EventSourceListener#onClosed()}
 *       exactly once at the end of the lifecycle.</li>
 * </ul>
 *
 * <h3>Thread-safety</h3>
 * <p>Instances are safe for typical usage: public methods are idempotent and guarded by atomics.
 * Callbacks are dispatched on {@code callbackExecutor} (inline by default) and must not block.</p>
 *
 * <p><strong>Internal:</strong> this class is not part of the public API and can change without notice.</p>
 *
 * @since 5.7
 */
@Internal
public final class DefaultEventSource implements EventSource {

    private static final Logger LOG = LoggerFactory.getLogger(DefaultEventSource.class);

    /**
     * Scalable shared scheduler used when callers do not provide their own.
     * Uses a small daemon pool; canceled tasks are removed to reduce heap churn.
     */
    private static final ScheduledExecutorService SHARED_SCHED;

    static {
        final int nThreads = Math.max(2, Math.min(8, Runtime.getRuntime().availableProcessors()));
        final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(
                nThreads, new DefaultThreadFactory("hc-sse", true));
        exec.setRemoveOnCancelPolicy(true);
        exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        SHARED_SCHED = exec;
    }

    private final CloseableHttpAsyncClient client;
    private final URI uri;
    private final Map<String, String> headers;
    private final EventSourceListener listener;

    private final ScheduledExecutorService scheduler;
    private final boolean ownScheduler;
    private final Executor callbackExecutor;
    private final BackoffStrategy backoff;
    private final int maxReconnects;
    private final SseParser parser;

    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private final AtomicBoolean closedOnce = new AtomicBoolean(false);
    private final AtomicBoolean connected = new AtomicBoolean(false);

    private volatile String lastEventId;
    /**
     * Sticky retry from SSE {@code retry:} field (ms); {@code -1} if not set.
     */
    private volatile long stickyRetryMs = -1L;
    /**
     * One-shot hint from HTTP {@code Retry-After} (ms); {@code -1} if absent.
     */
    private volatile long retryAfterHintMs = -1L;

    private final AtomicInteger attempts = new AtomicInteger(0);
    private volatile long previousDelayMs = 0L;
    private volatile Future<?> inFlight;

    /**
     * Creates a new {@code DefaultEventSource} using the shared scheduler, inline callback execution,
     * default config, and char-based parser.
     *
     * @param client   non-null async client
     * @param uri      non-null SSE endpoint
     * @param headers  initial headers (copied)
     * @param listener listener to receive events; may be {@code null} for a no-op
     */
    DefaultEventSource(final CloseableHttpAsyncClient client,
                       final URI uri,
                       final Map<String, String> headers,
                       final EventSourceListener listener) {
        this(client, uri, headers, listener, null, null, null, SseParser.CHAR);
    }

    /**
     * Creates a new {@code DefaultEventSource} with full control over scheduling, callback dispatch,
     * reconnect policy, and parser selection.
     *
     * @param client           non-null async client
     * @param uri              non-null SSE endpoint
     * @param headers          initial headers (copied)
     * @param listener         listener to receive events; may be {@code null} for a no-op
     * @param scheduler        optional scheduler; if {@code null}, a shared pool is used
     * @param callbackExecutor optional executor for listener callbacks; if {@code null}, runs inline
     * @param config           optional configuration; if {@code null}, {@link EventSourceConfig#DEFAULT} is used
     * @param parser           parser strategy ({@link SseParser#CHAR} or {@link SseParser#BYTE}); defaults to CHAR if {@code null}
     */
    public DefaultEventSource(final CloseableHttpAsyncClient client,
                       final URI uri,
                       final Map<String, String> headers,
                       final EventSourceListener listener,
                       final ScheduledExecutorService scheduler,
                       final Executor callbackExecutor,
                       final EventSourceConfig config,
                       final SseParser parser) {
        this.client = Objects.requireNonNull(client, "client");
        this.uri = Objects.requireNonNull(uri, "uri");
        this.headers = new ConcurrentHashMap<>(Objects.requireNonNull(headers, "headers"));
        this.listener = listener != null ? listener : (id, type, data) -> { /* no-op */ };

        if (scheduler != null) {
            this.scheduler = scheduler;
        } else {
            this.scheduler = SHARED_SCHED;
        }
        this.ownScheduler = false;

        this.callbackExecutor = callbackExecutor != null ? callbackExecutor : Runnable::run;

        final EventSourceConfig cfg = (config != null) ? config : EventSourceConfig.DEFAULT;
        this.backoff = cfg.backoff;
        this.maxReconnects = cfg.maxReconnects;
        this.parser = parser != null ? parser : SseParser.CHAR;
    }

    /**
     * {@inheritDoc}
     *
     * <p>Idempotent. Resets retry state and attempts the first connection immediately.</p>
     *
     */
    @Override
    public void start() {
        if (started.compareAndSet(false, true)) {
            attempts.set(0);
            previousDelayMs = 0;
            connect(0L);
        }
    }

    /**
     * {@inheritDoc}
     *
     * <p>Idempotent. Cancels any in-flight exchange, shuts down the owned scheduler (if any),
     * and ensures {@link EventSourceListener#onClosed()} is invoked exactly once.</p>
     *
     */
    @Override
    public void cancel() {
        final Future<?> f = inFlight;
        if (f != null) {
            f.cancel(true);
        }
        if (cancelled.compareAndSet(false, true)) {
            connected.set(false);
            if (ownScheduler) {
                try {
                    scheduler.shutdownNow();
                } catch (final Exception ignore) {
                }
            }
            notifyClosedOnce();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String lastEventId() {
        return lastEventId;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void setLastEventId(final String id) {
        this.lastEventId = id;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void setHeader(final String name, final String value) {
        headers.put(name, value);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeHeader(final String name) {
        headers.remove(name);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Map<String, String> getHeaders() {
        return new ConcurrentHashMap<>(headers);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isConnected() {
        return connected.get();
    }

    /**
     * Schedules or immediately performs a connection attempt.
     *
     * @param delayMs delay in milliseconds; non-positive runs immediately
     */
    private void connect(final long delayMs) {
        if (cancelled.get()) {
            return;
        }
        final Runnable task = this::doConnect;
        try {
            if (delayMs <= 0L) {
                task.run();
            } else {
                scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS);
            }
        } catch (final RejectedExecutionException e) {
            if (!cancelled.get()) {
                dispatch(() -> listener.onFailure(e, false));
                notifyClosedOnce();
            }
        }
    }

    /**
     * Builds the request, installs the response consumer, and executes the exchange.
     *
     * <p>Completion/failure callbacks determine whether to reconnect based on
     * {@link #willReconnectNext()} and {@link #scheduleReconnect()}.</p>
     *
     */
    private void doConnect() {
        if (cancelled.get()) {
            return;
        }

        final SimpleRequestBuilder rb = SimpleRequestBuilder.get(uri);
        rb.setHeader(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM.getMimeType());
        rb.setHeader(HttpHeaders.CACHE_CONTROL, "no-cache");
        if (lastEventId != null) {
            rb.setHeader("Last-Event-ID", lastEventId);
        }
        for (final Map.Entry<String, String> e : headers.entrySet()) {
            rb.setHeader(e.getKey(), e.getValue());
        }
        final SimpleHttpRequest req = rb.build();

        final AsyncResponseConsumer<Void> consumer = getAsyncResponseConsumer();

        inFlight = client.execute(SimpleRequestProducer.create(req), consumer, new FutureCallback<Void>() {
            @Override
            public void completed(final Void v) {
                connected.set(false);
                if (cancelled.get()) {
                    notifyClosedOnce();
                    return;
                }
                if (willReconnectNext()) {
                    scheduleReconnect();
                } else {
                    notifyClosedOnce();
                }
            }

            @Override
            public void failed(final Exception ex) {
                connected.set(false);
                if (ex instanceof SseResponseConsumer.StopReconnectException) {
                    dispatch(() -> listener.onFailure(ex, false));
                    notifyClosedOnce();
                    return;
                }
                if (cancelled.get() || isBenignCancel(ex)) {
                    notifyClosedOnce();
                    return;
                }
                final boolean will = willReconnectNext();
                dispatch(() -> listener.onFailure(ex, will));
                if (will) {
                    scheduleReconnect();
                } else {
                    notifyClosedOnce();
                }
            }

            @Override
            public void cancelled() {
                connected.set(false);
                notifyClosedOnce();
            }
        });
    }

    /**
     * Creates the {@link AsyncResponseConsumer} chain for SSE, selecting the low-level
     * entity consumer per {@link SseParser} and capturing {@code Retry-After} hints.
     *
     * @return response consumer that feeds parsed events into the listener
     */
    private AsyncResponseConsumer<Void> getAsyncResponseConsumer() {
        final SseCallbacks cbs = new SseCallbacks() {
            @Override
            public void onOpen() {
                connected.set(true);
                attempts.set(0);
                previousDelayMs = 0;
                dispatch(listener::onOpen);
            }

            @Override
            public void onEvent(final String id, final String type, final String data) {
                if (id != null) {
                    lastEventId = id;
                }
                dispatch(() -> listener.onEvent(id, type, data));
            }

            @Override
            public void onRetry(final long retryMs) {
                stickyRetryMs = Math.max(0L, retryMs);
            }
        };

        final AsyncEntityConsumer<Void> entity =
                (parser == SseParser.BYTE) ? new ByteSseEntityConsumer(cbs)
                        : new SseEntityConsumer(cbs);

        return new SseResponseConsumer(entity, ms -> retryAfterHintMs = Math.max(0L, ms));
    }

    /**
     * Decides whether a subsequent reconnect should be attempted, without mutating state.
     *
     * <p>Respects {@code maxReconnects}. Delegates to {@link BackoffStrategy#shouldReconnect(int, long, Long)}.
     * If the strategy throws, the method returns {@code false} to avoid spin.</p>
     *
     * @return {@code true} if a reconnect should be attempted
     */
    private boolean willReconnectNext() {
        if (cancelled.get()) {
            return false;
        }
        if (maxReconnects >= 0 && attempts.get() >= maxReconnects) {
            return false;
        }

        final int nextAttempt = attempts.get() + 1;
        final Long hint = (retryAfterHintMs >= 0L) ? Long.valueOf(retryAfterHintMs)
                : (stickyRetryMs >= 0L ? stickyRetryMs : null);
        boolean decision;
        try {
            decision = backoff.shouldReconnect(nextAttempt, previousDelayMs, hint);
        } catch (final RuntimeException rex) {
            // be conservative: if strategy blew up, do not spin forever
            LOG.warn("BackoffStrategy.shouldReconnect threw: {}; stopping reconnects", rex.toString());
            decision = false;
        }
        return decision;
    }

    /**
     * Computes the next delay using the {@link BackoffStrategy} and schedules a reconnect.
     *
     * <p>Consumes the one-shot {@code Retry-After} hint if present; the SSE {@code retry:}
     * hint remains sticky until overridden by the server.</p>
     *
     */
    private void scheduleReconnect() {
        if (!willReconnectNext()) {
            notifyClosedOnce();
            return;
        }
        final int attempt = attempts.incrementAndGet();
        final Long hint = (retryAfterHintMs >= 0L) ? Long.valueOf(retryAfterHintMs)
                : (stickyRetryMs >= 0L ? stickyRetryMs : null);
        long d;
        try {
            d = backoff.nextDelayMs(attempt, previousDelayMs, hint);
        } catch (final RuntimeException rex) {
            LOG.warn("BackoffStrategy.nextDelayMs threw: {}; defaulting to 1000ms", rex.toString());
            d = 1000L;
        }
        previousDelayMs = Math.max(0L, d);
        retryAfterHintMs = -1L; // one-shot hint consumed
        connect(previousDelayMs);
    }

    /**
     * Dispatches a listener task using the configured executor, falling back
     * to the caller thread if submission fails.
     *
     * @param r task to run
     */
    private void dispatch(final Runnable r) {
        try {
            callbackExecutor.execute(r);
        } catch (final RuntimeException e) {
            try {
                r.run();
            } catch (final Exception ex) {
                LOG.error("EventSource listener failed after submit failure: {}", ex, ex);

            }
        }
    }

    /**
     * Ensures {@link EventSourceListener#onClosed()} is invoked at most once.
     *
     */
    private void notifyClosedOnce() {
        if (closedOnce.compareAndSet(false, true)) {
            connected.set(false);
            dispatch(listener::onClosed);
        }
    }

    /**
     * Returns {@code true} for failure types that are expected during cancel/close.
     *
     * @param ex the exception to inspect
     * @return {@code true} if the exception represents a benign cancellation
     */
    private static boolean isBenignCancel(final Exception ex) {
        return ex instanceof RequestNotExecutedException
                || ex instanceof ConnectionClosedException
                || ex instanceof CancellationException
                || ex instanceof InterruptedIOException;
    }
}