SsePerfServer.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.sse.example.performance;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
import org.apache.hc.core5.http.io.HttpRequestHandler;
import org.apache.hc.core5.http.io.entity.AbstractHttpEntity;
/**
* Scaled local SSE server (HTTP/1.1) implemented with Apache HttpComponents Core 5 classic server.
* <p>
* Endpoint: {@code /sse}
* <br>Query params:
* <ul>
* <li>{@code rate} ��� events/sec per connection (default: 20)</li>
* <li>{@code size} ��� payload size (bytes) inside {@code data:} (default: 64)</li>
* <li>{@code sync} ��� send an {@code event: sync} with server nano time every N seconds (default: 10; 0 disables)</li>
* </ul>
*
* <p><b>Run (IntelliJ):</b>
* <ul>
* <li><b>Program arguments:</b> {@code 8089}</li>
* <li><b>VM options (optional, GC/tuning):</b>
* {@code -Xss256k -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -Xms1g -Xmx1g}</li>
* </ul>
*
* <p>Example client test:
* <pre>
* curl -N "<a href="http://localhost:8089/sse?rate=50&size=64">...</a>"
* </pre>
*/
public final class SsePerfServer {
private SsePerfServer() {
}
public static void main(final String[] args) throws Exception {
final int port = args.length > 0 ? Integer.parseInt(args[0]) : 8089;
final HttpRequestHandler sseHandler = (request, response, context) -> {
final URI uri = URI.create(request.getRequestUri());
final Map<String, String> q = parseQuery(uri.getRawQuery());
final int rate = parseInt(q.get("rate"), 20);
final int size = Math.max(1, parseInt(q.get("size"), 64));
final int syncSec = Math.max(0, parseInt(q.get("sync"), 10));
response.setCode(HttpStatus.SC_OK);
response.addHeader("Content-Type", "text/event-stream");
response.addHeader("Cache-Control", "no-cache");
response.addHeader("Connection", "keep-alive");
response.setEntity(new SseStreamEntity(rate, size, syncSec));
};
final HttpServer server = ServerBootstrap.bootstrap()
.setListenerPort(port)
.register("/sse", sseHandler)
.create();
Runtime.getRuntime().addShutdownHook(new Thread(server::stop, "sse-server-stop"));
server.start();
System.out.printf(Locale.ROOT, "[SSE-SERVER] listening on %d%n", port);
System.out.println("[SSE-SERVER] try: curl -N \"http://localhost:" + port + "/sse?rate=50&size=64\"");
}
/**
* Streaming entity that writes an infinite SSE stream with tight nanosecond scheduling.
*/
private static final class SseStreamEntity extends AbstractHttpEntity {
private final int rate;
private final int size;
private final int syncSec;
SseStreamEntity(final int rate, final int size, final int syncSec) {
super(ContentType.TEXT_EVENT_STREAM, null, true); // chunked
this.rate = rate;
this.size = size;
this.syncSec = syncSec;
}
@Override
public long getContentLength() {
return -1;
}
@Override
public void writeTo(final OutputStream outStream) throws IOException {
// buffered writes; still flush each event to keep latency low
final BufferedOutputStream os = new BufferedOutputStream(outStream, 8192);
// one-time random payload (base64) of requested size
final byte[] pad = new byte[size];
ThreadLocalRandom.current().nextBytes(pad);
final String padB64 = Base64.getEncoder().encodeToString(pad);
// initial sync with server monotonic time in nanoseconds
long nowNano = System.nanoTime();
writeAndFlush(os, "event: sync\ndata: tn=" + nowNano + "\n\n");
// schedule params
final long intervalNanos = (rate <= 0) ? 0L : (1_000_000_000L / rate);
long seq = 0L;
long next = System.nanoTime();
long nextSync = syncSec > 0 ? System.nanoTime() + TimeUnit.SECONDS.toNanos(syncSec) : Long.MAX_VALUE;
try {
while (!Thread.currentThread().isInterrupted()) {
nowNano = System.nanoTime();
// periodic sync tick
if (nowNano >= nextSync) {
writeAndFlush(os, "event: sync\ndata: tn=" + nowNano + "\n\n");
nextSync = nowNano + TimeUnit.SECONDS.toNanos(syncSec);
}
if (intervalNanos == 0L || nowNano >= next) {
// emit one event
final long tMs = System.currentTimeMillis();
final long tn = System.nanoTime();
final String frame =
"id: " + (++seq) + "\n" +
"event: m\n" +
"data: t=" + tMs + ",tn=" + tn + ",p=" + padB64 + "\n\n";
writeAndFlush(os, frame);
if (intervalNanos > 0L) {
// advance by exactly one period to avoid drift
next += intervalNanos;
// if we've fallen far behind (e.g. GC), realign to avoid bursts
if (nowNano - next > intervalNanos * 4L) {
next = nowNano + intervalNanos;
}
}
} else {
// tight, short sleep with nanosecond resolution
final long sleepNs = next - nowNano;
if (sleepNs > 0L) {
LockSupport.parkNanos(sleepNs);
}
}
}
} catch (final IOException closed) {
// client disconnected; finish quietly
}
}
@Override
public boolean isRepeatable() {
return false;
}
@Override
public InputStream getContent() throws IOException, UnsupportedOperationException {
return null;
}
@Override
public boolean isStreaming() {
return true;
}
@Override
public void close() throws IOException { /* no-op */ }
private static void writeAndFlush(final BufferedOutputStream os, final String s) throws IOException {
os.write(s.getBytes(StandardCharsets.UTF_8));
os.flush();
}
}
// -------- helpers --------
private static int parseInt(final String s, final int def) {
if (s == null) {
return def;
}
try {
return Integer.parseInt(s);
} catch (final Exception ignore) {
return def;
}
}
private static Map<String, String> parseQuery(final String raw) {
final Map<String, String> m = new HashMap<>();
if (raw == null || raw.isEmpty()) {
return m;
}
final String[] parts = raw.split("&");
for (final String part : parts) {
final int eq = part.indexOf('=');
if (eq > 0) {
m.put(urlDecode(part.substring(0, eq)), urlDecode(part.substring(eq + 1)));
}
}
return m;
}
private static String urlDecode(final String s) {
try {
return URLDecoder.decode(s, StandardCharsets.UTF_8.name());
} catch (final Exception e) {
return s;
}
}
}