SsePerfClient.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.sse.example.performance;
import java.net.URI;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.sse.EventSource;
import org.apache.hc.client5.http.sse.EventSourceListener;
import org.apache.hc.client5.http.sse.SseExecutor;
import org.apache.hc.client5.http.sse.impl.SseParser;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.TimeValue;
/**
* Scaled SSE client harness with nano-time calibration and batched ramp-up.
* <p>
* Args:
* uri connections durationSec parser(BYTE|CHAR) h2(true|false) openBatch openBatchPauseMs
* <p>
* Examples:
* # Local server @ 50 eps per conn, 64B payload:
* # java ... SsePerfServer 8089
* java ... SsePerfClient <a href="http://localhost:8089/sse?rate=50&size=64">...</a> 2000 120 BYTE false 200 100
* <p>
* # External SSE (H2 negotiation):
* java ... SsePerfClient <a href="https://stream.wikimedia.org/v2/stream/recentchange">...</a> 1000 120 BYTE true 100 200
*/
public final class SsePerfClient {
public static void main(final String[] args) throws Exception {
final URI uri = URI.create(args.length > 0 ? args[0] : "http://localhost:8089/sse?rate=20&size=64");
final int connections = args.length > 1 ? Integer.parseInt(args[1]) : 200;
final int durationSec = args.length > 2 ? Integer.parseInt(args[2]) : 60;
final SseParser parser = args.length > 3 ? SseParser.valueOf(args[3]) : SseParser.BYTE;
final boolean h2 = args.length > 4 ? Boolean.parseBoolean(args[4]) : false;
final int openBatch = args.length > 5 ? Integer.parseInt(args[5]) : 200;
final int openBatchPauseMs = args.length > 6 ? Integer.parseInt(args[6]) : 100;
System.out.printf(Locale.ROOT,
"Target=%s%nConnections=%d Duration=%ds Parser=%s H2=%s Batch=%d Pause=%dms%n",
uri, connections, durationSec, parser, h2, openBatch, openBatchPauseMs);
// --- Client & pool tuned for fan-out ---
final IOReactorConfig ioCfg = IOReactorConfig.custom()
.setIoThreadCount(Math.max(2, Runtime.getRuntime().availableProcessors()))
.setSoKeepAlive(true)
.setTcpNoDelay(true)
.build();
final PoolingAsyncClientConnectionManager connMgr =
PoolingAsyncClientConnectionManagerBuilder.create()
.useSystemProperties()
.setMessageMultiplexing(true) // enable H2 multiplexing if negotiated
.setMaxConnPerRoute(Math.max(64, connections))
.setMaxConnTotal(Math.max(128, connections))
.setDefaultTlsConfig(
TlsConfig.custom()
.setVersionPolicy(h2 ? HttpVersionPolicy.NEGOTIATE : HttpVersionPolicy.FORCE_HTTP_1)
.build())
.build();
final CloseableHttpAsyncClient httpClient = HttpAsyncClientBuilder.create()
.setIOReactorConfig(ioCfg)
.setConnectionManager(connMgr)
.setH2Config(H2Config.custom()
.setPushEnabled(false)
.setMaxConcurrentStreams(512)
.build())
.useSystemProperties()
.evictExpiredConnections()
.evictIdleConnections(TimeValue.ofMinutes(1))
.build();
final ScheduledThreadPoolExecutor scheduler =
new ScheduledThreadPoolExecutor(Math.min(8, Math.max(2, Runtime.getRuntime().availableProcessors())),
new DefaultThreadFactory("sse-perf-backoff", true));
scheduler.setRemoveOnCancelPolicy(true);
final Executor callbacks = Runnable::run;
// --- Metrics ---
final AtomicInteger openCount = new AtomicInteger();
final AtomicInteger connectedNow = new AtomicInteger();
final AtomicLong events = new AtomicLong();
final AtomicLong reconnects = new AtomicLong();
final AtomicLong failures = new AtomicLong();
final LogHistogram latencyNs = new LogHistogram();
// --- SSE executor ---
final SseExecutor exec = SseExecutor.custom()
.setHttpClient(httpClient)
.setScheduler(scheduler)
.setCallbackExecutor(callbacks)
.setParserStrategy(parser)
.build();
// --- Open connections in batches to avoid thundering herd ---
final CountDownLatch started = new CountDownLatch(connections);
final CountDownLatch done = new CountDownLatch(connections);
int opened = 0;
while (opened < connections) {
final int toOpen = Math.min(openBatch, connections - opened);
for (int i = 0; i < toOpen; i++) {
final EventSource es = exec.open(uri,
newListener(events, reconnects, failures, openCount, connectedNow, latencyNs, done));
es.start();
started.countDown();
}
opened += toOpen;
if (opened < connections && openBatchPauseMs > 0) {
Thread.sleep(openBatchPauseMs);
}
}
final long startMs = System.currentTimeMillis();
final ScheduledFuture<?> reporter = scheduler.scheduleAtFixedRate(new Runnable() {
long lastEvents = 0;
long lastTs = System.currentTimeMillis();
@Override
public void run() {
final long now = System.currentTimeMillis();
final long ev = events.get();
final long deltaE = ev - lastEvents;
final long deltaMs = Math.max(1L, now - lastTs);
final double eps = (deltaE * 1000.0) / deltaMs;
final LogHistogram.Snapshot s = latencyNs.snapshot();
final long p50us = s.percentile(50) / 1000;
final long p95us = s.percentile(95) / 1000;
final long p99us = s.percentile(99) / 1000;
System.out.printf(Locale.ROOT,
"t=+%4ds con=%d open=%d ev=%d (%.0f/s) rec=%d fail=%d p50=%d��s p95=%d��s p99=%d��s%n",
(int) ((now - startMs) / 1000),
connectedNow.get(), openCount.get(), ev, eps,
reconnects.get(), failures.get(),
p50us, p95us, p99us);
lastEvents = ev;
lastTs = now;
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
// --- Run for duration, then shutdown ---
started.await();
Thread.sleep(Math.max(1, durationSec) * 1000L);
reporter.cancel(true);
scheduler.shutdownNow();
exec.close();
httpClient.close();
done.await(5, TimeUnit.SECONDS);
System.out.println("DONE");
}
private static EventSourceListener newListener(
final AtomicLong events,
final AtomicLong reconnects,
final AtomicLong failures,
final AtomicInteger openCount,
final AtomicInteger connectedNow,
final LogHistogram latencyNs,
final CountDownLatch done) {
return new EventSourceListener() {
// Per-stream calibration state
volatile boolean calibrated;
volatile long nanoOffset; // clientNano - serverNano
volatile long lastArrivalNs;
@Override
public void onOpen() {
openCount.incrementAndGet();
connectedNow.incrementAndGet();
lastArrivalNs = System.nanoTime();
calibrated = false;
nanoOffset = 0L;
}
@Override
public void onEvent(final String id, final String type, final String data) {
final long nowNano = System.nanoTime();
if ("sync".equals(type)) {
final long sn = parseFieldLong(data, "tn=");
if (sn > 0) {
nanoOffset = nowNano - sn;
calibrated = true;
}
return;
}
events.incrementAndGet();
// Prefer monotonic tn if calibrated
final long sn = parseFieldLong(data, "tn=");
if (calibrated && sn > 0) {
final long oneWayNs = nowNano - (sn + nanoOffset);
if (oneWayNs > 0) {
latencyNs.recordNanos(oneWayNs);
}
} else {
// Fallbacks
final long ms = parseFieldLong(data, "t=");
if (ms > 0) {
final long oneWayNs = (System.currentTimeMillis() - ms) * 1_000_000L;
if (oneWayNs > 0) {
latencyNs.recordNanos(oneWayNs);
}
} else {
final long delta = nowNano - lastArrivalNs;
if (delta > 0) {
latencyNs.recordNanos(delta);
}
}
}
lastArrivalNs = nowNano;
}
@Override
public void onClosed() {
connectedNow.decrementAndGet();
done.countDown();
}
@Override
public void onFailure(final Throwable t, final boolean willReconnect) {
failures.incrementAndGet();
if (willReconnect) {
reconnects.incrementAndGet();
}
}
};
}
private static long parseFieldLong(final String data, final String keyEq) {
if (data == null) {
return -1;
}
final int i = data.indexOf(keyEq);
if (i < 0) {
return -1;
}
final int j = i + keyEq.length();
int end = j;
while (end < data.length()) {
final char c = data.charAt(end);
if (c < '0' || c > '9') {
break;
}
end++;
}
try {
return Long.parseLong(data.substring(j, end));
} catch (final Exception ignore) {
return -1;
}
}
// ---- Self-contained log2 histogram in nanoseconds ----
static final class LogHistogram {
private final LongAdder[] buckets = new LongAdder[64];
LogHistogram() {
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new LongAdder();
}
}
void recordNanos(final long v) {
if (v <= 0) {
buckets[0].increment();
return;
}
int idx = 63 - Long.numberOfLeadingZeros(v);
if (idx < 0) {
idx = 0;
}
else if (idx > 63) {
idx = 63;
}
buckets[idx].increment();
}
Snapshot snapshot() {
final long[] c = new long[64];
long total = 0;
for (int i = 0; i < 64; i++) {
c[i] = buckets[i].sum();
total += c[i];
}
return new Snapshot(c, total);
}
static final class Snapshot {
final long[] counts;
final long total;
Snapshot(final long[] counts, final long total) {
this.counts = counts;
this.total = total;
}
long percentile(final double p) {
if (total == 0) {
return 0;
}
long rank = (long) Math.ceil((p / 100.0) * total);
if (rank <= 0) {
rank = 1;
}
long cum = 0;
for (int i = 0; i < 64; i++) {
cum += counts[i];
if (cum >= rank) {
return (i == 63) ? Long.MAX_VALUE : ((1L << (i + 1)) - 1);
}
}
return (1L << 63) - 1;
}
}
}
}