SsePerfServer.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.sse.example;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Locale;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
/**
* Minimal SSE server for benchmarking.
* <p>
* Uses the JDK built-in {@code com.sun.net.httpserver.HttpServer} (HTTP/1.1) and emits SSE events at a fixed rate.
* Each connection gets its own scheduled emitter task.
* </p>
*
* <h2>Usage</h2>
* <pre>
* SsePerfServer [port] [eventsPerSecond] [payloadBytes] [path]
* </pre>
*
* <p>
* Defaults: {@code port=8080}, {@code eventsPerSecond=50}, {@code payloadBytes=32}, {@code path=/events}.
* </p>
*
* <p>
* Note: This is intentionally tiny and not tuned for huge fanout. For large-scale fanout, use a shared
* publisher loop and write to multiple connections (or use HttpCore5 server bootstrap).
* </p>
*/
public final class SsePerfServer {
private static int intArg(final String[] args, final int idx, final int def) {
return args.length > idx ? Integer.parseInt(args[idx]) : def;
}
private static String payload(final int bytes) {
if (bytes <= 0) {
return "ok";
}
final StringBuilder sb = new StringBuilder(bytes);
while (sb.length() < bytes) {
sb.append('x');
}
return sb.toString();
}
public static void main(final String[] args) throws Exception {
final int port = intArg(args, 0, 8080);
final int eventsPerSecond = intArg(args, 1, 50);
final int payloadBytes = intArg(args, 2, 32);
final String path = args.length > 3 ? args[3] : "/events";
final HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(
Math.max(2, Runtime.getRuntime().availableProcessors()));
final AtomicLong idSeq = new AtomicLong(0);
final String dataPayload = payload(payloadBytes);
final long periodNanos;
if (eventsPerSecond > 0) {
final long p = 1_000_000_000L / (long) eventsPerSecond;
periodNanos = Math.max(1L, p);
} else {
periodNanos = 1_000_000_000L;
}
server.createContext(path, exchange -> {
if (!"GET".equalsIgnoreCase(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(405, -1);
exchange.close();
return;
}
final Headers h = exchange.getResponseHeaders();
h.add("Content-Type", "text/event-stream; charset=utf-8");
h.add("Cache-Control", "no-cache");
h.add("Connection", "keep-alive");
h.add("Access-Control-Allow-Origin", "*");
exchange.sendResponseHeaders(200, 0);
final OutputStream out = exchange.getResponseBody();
try {
out.write((": connected " + Instant.now().toString() + "\n\n").getBytes(StandardCharsets.UTF_8));
out.flush();
} catch (final IOException ex) {
try {
out.close();
} catch (final IOException ignore) {
}
exchange.close();
return;
}
final Runnable emitter = () -> {
final long id = idSeq.incrementAndGet();
final String msg =
"id: " + id + "\n" +
"event: message\n" +
"data: " + dataPayload + "\n\n";
try {
out.write(msg.getBytes(StandardCharsets.UTF_8));
out.flush();
} catch (final IOException ex) {
throw new RuntimeException(ex);
}
};
final ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
emitter,
0L,
periodNanos,
TimeUnit.NANOSECONDS);
try {
// Blocks until cancelled or the task terminates (e.g. client disconnect -> IOException).
future.get();
} catch (final CancellationException ignore) {
// expected on shutdown / cancel
} catch (final ExecutionException ignore) {
// expected on client disconnect (write fails)
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
future.cancel(true);
try {
out.close();
} catch (final IOException ignore) {
}
exchange.close();
}
});
server.setExecutor(Executors.newCachedThreadPool());
server.start();
System.out.printf(Locale.ROOT,
"SsePerfServer listening on http://localhost:%d%s (rate=%d/s payload=%d)%n",
port, path, eventsPerSecond, payloadBytes);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
server.stop(0);
scheduler.shutdownNow();
}, "sse-perfserver-shutdown"));
}
}