ClientSseExample.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.sse.example;
import java.net.URI;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.sse.EventSource;
import org.apache.hc.client5.http.sse.EventSourceConfig;
import org.apache.hc.client5.http.sse.EventSourceListener;
import org.apache.hc.client5.http.sse.impl.ExponentialJitterBackoff;
import org.apache.hc.client5.http.sse.SseExecutor;
import org.apache.hc.client5.http.sse.impl.SseParser;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.TimeValue;
public final class ClientSseExample {
public static void main(final String[] args) throws Exception {
final URI uri = URI.create(args.length > 0
? args[0]
: "https://stream.wikimedia.org/v2/stream/recentchange");
// 1) IO & pool tuned for low latency + H2 multiplexing
final IOReactorConfig ioCfg = IOReactorConfig.custom()
.setIoThreadCount(Math.max(2, Runtime.getRuntime().availableProcessors()))
.setSoKeepAlive(true)
.setTcpNoDelay(true)
.build();
final PoolingAsyncClientConnectionManager connMgr =
PoolingAsyncClientConnectionManagerBuilder.create()
.setMessageMultiplexing(true) // HTTP/2 stream multiplexing
.setMaxConnPerRoute(32)
.setMaxConnTotal(256)
.setDefaultTlsConfig(
TlsConfig.custom()
.setVersionPolicy(HttpVersionPolicy.NEGOTIATE) // or FORCE_HTTP_2 / FORCE_HTTP_1
.build())
.build();
final CloseableHttpAsyncClient httpClient = HttpAsyncClientBuilder.create()
.setIOReactorConfig(ioCfg)
.setConnectionManager(connMgr)
.setH2Config(H2Config.custom()
.setPushEnabled(false)
.setMaxConcurrentStreams(256)
.build())
.evictExpiredConnections()
.evictIdleConnections(TimeValue.ofMinutes(1))
.build();
// 2) Scheduler for reconnects (multithreaded; cancels are purged)
final ScheduledThreadPoolExecutor scheduler =
new ScheduledThreadPoolExecutor(4, new DefaultThreadFactory("sse-backoff", true));
scheduler.setRemoveOnCancelPolicy(true);
// 3) Callback executor (direct = lowest latency; swap for a small pool if your handler is heavy)
final Executor callbacks = Runnable::run;
// 4) Default EventSource policy (backoff + unlimited retries)
final EventSourceConfig defaultCfg = EventSourceConfig.builder()
.backoff(new ExponentialJitterBackoff(500L, 30_000L, 2.0, 250L))
.maxReconnects(-1)
.build();
// 5) Default headers for all streams
final Map<String, String> defaultHeaders = new HashMap<>();
defaultHeaders.put("User-Agent", "Apache-HttpClient-SSE/5.x");
defaultHeaders.put("Accept-Language", "en");
// 6) Build SSE executor with BYTE parser (minimal allocations)
final SseExecutor exec = SseExecutor.custom()
.setHttpClient(httpClient)
.setScheduler(scheduler)
.setCallbackExecutor(callbacks)
.setEventSourceConfig(defaultCfg)
.setDefaultHeaders(defaultHeaders)
.setParserStrategy(SseParser.BYTE)
.build();
// 7) Listener
final CountDownLatch done = new CountDownLatch(1);
final EventSourceListener listener = new EventSourceListener() {
@Override
public void onOpen() {
System.out.println("[SSE] open: " + uri);
}
@Override
public void onEvent(final String id, final String type, final String data) {
final String shortData = data.length() > 120 ? data.substring(0, 120) + "���" : data;
System.out.printf(Locale.ROOT, "[SSE] %s id=%s %s%n",
type != null ? type : "message", id, shortData);
}
@Override
public void onClosed() {
System.out.println("[SSE] closed");
done.countDown();
}
@Override
public void onFailure(final Throwable t, final boolean willReconnect) {
System.err.println("[SSE] failure: " + t + " willReconnect=" + willReconnect);
if (!willReconnect) {
done.countDown();
}
}
};
// 8) Per-stream overrides (optional)
final Map<String, String> perStreamHeaders = new HashMap<>();
final EventSourceConfig perStreamCfg = EventSourceConfig.builder()
.backoff(new ExponentialJitterBackoff(750L, 20_000L, 2.0, 250L))
.maxReconnects(-1)
.build();
final EventSource es = exec.open(
uri,
perStreamHeaders,
listener,
perStreamCfg,
SseParser.BYTE,
scheduler,
callbacks
);
// Clean shutdown on Ctrl+C
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
es.cancel();
} catch (final Exception ignore) {
}
try {
exec.close();
} catch (final Exception ignore) {
}
try {
scheduler.shutdownNow();
} catch (final Exception ignore) {
}
}, "sse-shutdown"));
es.start();
done.await();
es.cancel();
exec.close();
scheduler.shutdownNow();
}
}