SseExecutor.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.sse;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.sse.impl.DefaultEventSource;
import org.apache.hc.client5.http.sse.impl.SseParser;
import org.apache.hc.core5.reactor.IOReactorStatus;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
/**
* Entry point for creating and managing {@link EventSource} instances backed by an
* {@link CloseableHttpAsyncClient}.
*
* <p>This type provides:
* <ul>
* <li>A process-wide shared async client (see {@link #newInstance()}),</li>
* <li>Factory methods that accept a caller-supplied client (see {@link #newInstance(CloseableHttpAsyncClient)}),</li>
* <li>A builder for fine-grained defaults (headers, backoff, parser, executors) applied to
* all streams opened via this executor (see {@link #custom()}).</li>
* </ul>
*
* <p><strong>Lifecycle.</strong> When using the shared client, {@link #close()} is a no-op,
* and the client remains available process-wide until {@link #closeSharedClient()} is called.
* When you supply your own client, {@link #close()} will close that client.</p>
*
* <p><strong>Thread-safety.</strong> Instances are thread-safe. Methods may be called from any thread.</p>
*
* <p><strong>Usage example</strong></p>
* <pre>{@code
* SseExecutor exec = SseExecutor.custom()
* .setDefaultBackoff(new ExponentialJitterBackoff(1000, 30000, 2.0, 250))
* .setDefaultMaxReconnects(-1)
* .build();
*
* EventSource es = exec.open(URI.create("https://example/sse"),
* Collections.singletonMap("X-Token", "abc"),
* new EventSourceListener() {
* public void onEvent(String id, String type, String data) {
* System.out.println("event " + type + ": " + data);
* }
* });
*
* es.start();
* }</pre>
*
* @since 5.7
*/
public final class SseExecutor {
// Visible for tests
static final ReentrantLock LOCK = new ReentrantLock();
static volatile CloseableHttpAsyncClient SHARED_CLIENT;
/**
* Returns the lazily-initialized shared async client. If it does not yet exist, it is
* created with a pooling connection manager and started.
*/
static CloseableHttpAsyncClient getSharedClient() {
CloseableHttpAsyncClient c = SHARED_CLIENT;
if (c != null) {
return c;
}
LOCK.lock();
try {
c = SHARED_CLIENT;
if (c == null) {
c = HttpAsyncClientBuilder.create()
.setConnectionManager(PoolingAsyncClientConnectionManagerBuilder.create()
.useSystemProperties()
.setMaxConnPerRoute(100)
.setMaxConnTotal(200)
.setMessageMultiplexing(true)
.build())
.useSystemProperties()
.evictExpiredConnections()
.evictIdleConnections(TimeValue.ofMinutes(1))
.build();
c.start();
SHARED_CLIENT = c;
}
return c;
} finally {
LOCK.unlock();
}
}
/**
* Creates a builder for a fully configurable {@link SseExecutor}.
*
* <p>Use this when you want to set defaults such as headers, backoff,
* parser strategy (char vs. byte), or custom executors for scheduling and callbacks.</p>
* @return a new {@link SseExecutorBuilder}
*/
public static SseExecutorBuilder custom() {
return new SseExecutorBuilder();
}
/**
* Creates an {@code SseExecutor} that uses a process-wide shared async client.
*
* <p>Streams opened by this executor will share one underlying {@link CloseableHttpAsyncClient}
* instance. {@link #close()} will be a no-op; call {@link #closeSharedClient()} to
* explicitly shut the shared client down (for tests / application shutdown).</p>
* @return a new {@link SseExecutor}
*/
public static SseExecutor newInstance() {
final CloseableHttpAsyncClient c = getSharedClient();
return new SseExecutor(c, true, null, null, EventSourceConfig.DEFAULT,
Collections.<String, String>emptyMap(), SseParser.CHAR);
}
/**
* Creates an {@code SseExecutor} using the caller-supplied async client.
*
* <p>The caller owns the lifecycle of the given client. {@link #close()} will close it.</p>
*
* @param client an already constructed async client
* @throws NullPointerException if {@code client} is {@code null}
* @throws IllegalStateException if the client is shutting down or shut down
* @return a new {@link SseExecutor}
*
*/
public static SseExecutor newInstance(final CloseableHttpAsyncClient client) {
Args.notNull(client, "HTTP Async Client");
final boolean isShared = client == SHARED_CLIENT;
return new SseExecutor(client, isShared, null, null, EventSourceConfig.DEFAULT,
Collections.<String, String>emptyMap(), SseParser.CHAR);
}
/**
* Closes and clears the shared async client, if present.
*
* <p>Useful for tests or orderly application shutdown.</p>
* @throws IOException if closing the client fails
*/
public static void closeSharedClient() throws IOException {
LOCK.lock();
try {
if (SHARED_CLIENT != null) {
SHARED_CLIENT.close();
SHARED_CLIENT = null;
}
} finally {
LOCK.unlock();
}
}
private final CloseableHttpAsyncClient client;
private final boolean isSharedClient;
private final ScheduledExecutorService defaultScheduler; // nullable
private final Executor defaultCallbackExecutor; // nullable
private final EventSourceConfig defaultConfig;
private final Map<String, String> defaultHeaders; // unmodifiable
private final SseParser defaultParser;
SseExecutor(final CloseableHttpAsyncClient client,
final boolean isSharedClient,
final ScheduledExecutorService defaultScheduler,
final Executor defaultCallbackExecutor,
final EventSourceConfig defaultConfig,
final Map<String, String> defaultHeaders,
final SseParser defaultParser) {
this.client = client;
this.isSharedClient = isSharedClient;
this.defaultScheduler = defaultScheduler;
this.defaultCallbackExecutor = defaultCallbackExecutor;
this.defaultConfig = defaultConfig != null ? defaultConfig : EventSourceConfig.DEFAULT;
this.defaultHeaders = defaultHeaders != null
? Collections.unmodifiableMap(new LinkedHashMap<>(defaultHeaders))
: Collections.emptyMap();
this.defaultParser = defaultParser != null ? defaultParser : SseParser.CHAR;
final IOReactorStatus status = client.getStatus();
if (status == IOReactorStatus.INACTIVE) {
client.start();
} else if (status == IOReactorStatus.SHUTTING_DOWN || status == IOReactorStatus.SHUT_DOWN) {
throw new IllegalStateException("Async client not usable: " + status);
}
}
/**
* Closes the underlying async client if this executor does <em>not</em> use
* the process-wide shared client. No-op otherwise.
* @throws IOException if closing the client fails
*/
public void close() throws IOException {
if (!isSharedClient) {
client.close();
}
}
/**
* Opens an {@link EventSource} with the executor's defaults (headers, config, parser, executors).
*
* @param uri target SSE endpoint (must produce {@code text/event-stream})
* @param listener event callbacks
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri, final EventSourceListener listener) {
return open(uri, this.defaultHeaders, listener, this.defaultConfig,
this.defaultParser, this.defaultScheduler, this.defaultCallbackExecutor);
}
/**
* Opens an {@link EventSource} overriding headers; other defaults are inherited.
*
* @param uri target SSE endpoint
* @param headers extra request headers (merged with executor defaults)
* @param listener event callbacks
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri,
final Map<String, String> headers,
final EventSourceListener listener) {
return open(uri, mergeHeaders(this.defaultHeaders, headers), listener, this.defaultConfig,
this.defaultParser, this.defaultScheduler, this.defaultCallbackExecutor);
}
/**
* Opens an {@link EventSource} overriding headers and reconnect policy; other defaults are inherited.
*
* @param uri target SSE endpoint
* @param headers extra request headers (merged with executor defaults)
* @param listener event callbacks
* @param config reconnect/backoff config
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri,
final Map<String, String> headers,
final EventSourceListener listener,
final EventSourceConfig config) {
return open(uri, mergeHeaders(this.defaultHeaders, headers), listener, config,
this.defaultParser, this.defaultScheduler, this.defaultCallbackExecutor);
}
/**
* Full-control open allowing a custom parser strategy and executors.
*
* @param uri target SSE endpoint
* @param headers request headers (not {@code null}, may be empty)
* @param listener event callbacks
* @param config reconnect/backoff config (uses {@link EventSourceConfig#DEFAULT} if {@code null})
* @param parser parsing strategy ({@link SseParser#CHAR} or {@link SseParser#BYTE})
* @param scheduler scheduler for reconnects (nullable ��� internal shared scheduler)
* @param callbackExecutor executor for listener callbacks (nullable ��� run inline)
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri,
final Map<String, String> headers,
final EventSourceListener listener,
final EventSourceConfig config,
final SseParser parser,
final ScheduledExecutorService scheduler,
final Executor callbackExecutor) {
return new DefaultEventSource(
client,
uri,
headers != null ? headers : Collections.<String, String>emptyMap(),
listener,
scheduler,
callbackExecutor,
config,
parser != null ? parser : this.defaultParser);
}
/**
* Returns the underlying {@link CloseableHttpAsyncClient}.
* @return the client
*/
public CloseableHttpAsyncClient getClient() {
return client;
}
private static Map<String, String> mergeHeaders(final Map<String, String> base, final Map<String, String> extra) {
if (base == null || base.isEmpty()) {
return extra != null ? extra : Collections.<String, String>emptyMap();
}
final LinkedHashMap<String, String> merged = new LinkedHashMap<>(base);
if (extra != null && !extra.isEmpty()) {
merged.putAll(extra);
}
return merged;
}
}