ClientSseH2Example.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.sse.example;

import java.net.URI;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLSession;

import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.sse.EventSource;
import org.apache.hc.client5.http.sse.EventSourceConfig;
import org.apache.hc.client5.http.sse.EventSourceListener;
import org.apache.hc.client5.http.sse.SseExecutor;
import org.apache.hc.client5.http.sse.impl.ExponentialJitterBackoff;
import org.apache.hc.client5.http.sse.impl.SseParser;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.TimeValue;

/**
 * HTTP/2 SSE demo.
 * <p>
 * This example connects to an SSE endpoint using the async transport with HTTP/2 forced via TLS + ALPN,
 * probes the negotiated protocol, and then opens multiple SSE subscriptions concurrently to demonstrate
 * HTTP/2 stream multiplexing.
 * </p>
 *
 * <h2>Usage</h2>
 * <pre>
 *   ClientSseH2Example [uri] [streamCount] [maxEventsPerStream]
 * </pre>
 *
 * <p>
 * Defaults:
 * </p>
 * <ul>
 *   <li>{@code uri=https://stream.wikimedia.org/v2/stream/recentchange}</li>
 *   <li>{@code streamCount=4}</li>
 *   <li>{@code maxEventsPerStream=25}</li>
 * </ul>
 *
 * <h2>Notes</h2>
 * <ul>
 *   <li>HTTP/2 is enforced with {@link org.apache.hc.core5.http2.HttpVersionPolicy#FORCE_HTTP_2}. If the origin
 *       cannot negotiate H2, the connection attempt fails (no silent downgrade).</li>
 *   <li>The probe request prints {@code HTTP/2.0} when ALPN negotiation succeeds.</li>
 *   <li>With {@code maxConnPerRoute=1}, multiple active SSE subscriptions are only possible with HTTP/2
 *       multiplexing (each subscription is a separate H2 stream).</li>
 * </ul>
 */
public final class ClientSseH2Example {

    private static void probeProtocol(final CloseableHttpAsyncClient httpClient, final URI sseUri) throws Exception {
        final HttpHost target = new HttpHost(sseUri.getScheme(), sseUri.getHost(), sseUri.getPort());
        final HttpClientContext ctx = HttpClientContext.create();

        final SimpleHttpRequest req = SimpleRequestBuilder.get()
                .setHttpHost(target)
                .setPath("/")
                .build();

        final Future<SimpleHttpResponse> f = httpClient.execute(
                SimpleRequestProducer.create(req),
                SimpleResponseConsumer.create(),
                ctx,
                null);

        final SimpleHttpResponse resp = f.get(10, TimeUnit.SECONDS);

        System.out.println("[probe] " + req + " -> " + new StatusLine(resp));
        System.out.println("[probe] negotiated protocol: " + ctx.getProtocolVersion());

        final SSLSession sslSession = ctx.getSSLSession();
        if (sslSession != null) {
            System.out.println("[probe] TLS protocol: " + sslSession.getProtocol());
            System.out.println("[probe] TLS cipher: " + sslSession.getCipherSuite());
        }

        if (!HttpVersion.HTTP_2.equals(ctx.getProtocolVersion())) {
            System.out.println("[probe] WARNING: not HTTP/2 (server / proxy downgraded?)");
        }
    }

    public static void main(final String[] args) throws Exception {
        final URI uri = URI.create(args.length > 0
                ? args[0]
                : "https://stream.wikimedia.org/v2/stream/recentchange");

        final int streamCount = args.length > 1 ? Integer.parseInt(args[1]) : 4;
        final int maxEventsPerStream = args.length > 2 ? Integer.parseInt(args[2]) : 25;

        final IOReactorConfig ioCfg = IOReactorConfig.custom()
                .setIoThreadCount(Math.max(2, Runtime.getRuntime().availableProcessors()))
                .setSoKeepAlive(true)
                .setTcpNoDelay(true)
                .build();

        final PoolingAsyncClientConnectionManager connMgr =
                PoolingAsyncClientConnectionManagerBuilder.create()
                        .setMessageMultiplexing(true)
                        .setMaxConnPerRoute(1)
                        .setMaxConnTotal(4)
                        .setDefaultTlsConfig(TlsConfig.custom()
                                .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
                                .build())
                        .build();

        final CloseableHttpAsyncClient httpClient = HttpAsyncClientBuilder.create()
                .setIOReactorConfig(ioCfg)
                .setConnectionManager(connMgr)
                .setH2Config(H2Config.custom()
                        .setPushEnabled(false)
                        .setMaxConcurrentStreams(Math.max(64, streamCount * 8))
                        .build())
                .evictExpiredConnections()
                .evictIdleConnections(TimeValue.ofMinutes(1))
                .build();

        httpClient.start();
        probeProtocol(httpClient, uri);

        final ScheduledThreadPoolExecutor scheduler =
                new ScheduledThreadPoolExecutor(2, new DefaultThreadFactory("sse-backoff", true));
        scheduler.setRemoveOnCancelPolicy(true);

        final Executor callbacks = Runnable::run;

        final EventSourceConfig cfg = EventSourceConfig.builder()
                .backoff(new ExponentialJitterBackoff(500L, 30_000L, 2.0, 250L))
                .maxReconnects(-1)
                .build();

        final Map<String, String> defaultHeaders = new HashMap<>();
        defaultHeaders.put("User-Agent", "Apache-HttpClient-SSE/5.x");
        defaultHeaders.put("Accept-Language", "en");

        final SseExecutor exec = SseExecutor.custom()
                .setHttpClient(httpClient)
                .setScheduler(scheduler)
                .setCallbackExecutor(callbacks)
                .setEventSourceConfig(cfg)
                .setDefaultHeaders(defaultHeaders)
                .setParserStrategy(SseParser.BYTE)
                .build();

        final CountDownLatch done = new CountDownLatch(streamCount);
        final EventSource[] sources = new EventSource[streamCount];

        for (int i = 0; i < streamCount; i++) {
            final int idx = i;
            final AtomicInteger count = new AtomicInteger(0);

            final Map<String, String> headers = new HashMap<>();
            headers.put("X-Client-Stream", Integer.toString(idx));

            final EventSourceListener listener = new EventSourceListener() {

                @Override
                public void onOpen() {
                    System.out.printf(Locale.ROOT, "[SSE/%d] open: %s%n", idx, uri);
                }

                @Override
                public void onEvent(final String id, final String type, final String data) {
                    final int n = count.incrementAndGet();
                    final String shortData = data.length() > 120 ? data.substring(0, 120) + "���" : data;
                    System.out.printf(Locale.ROOT, "[SSE/%d] #%d %s id=%s %s%n",
                            idx, n, type != null ? type : "message", id, shortData);

                    if (n >= maxEventsPerStream) {
                        sources[idx].cancel();
                    }
                }

                @Override
                public void onClosed() {
                    System.out.printf(Locale.ROOT, "[SSE/%d] closed%n", idx);
                    done.countDown();
                }

                @Override
                public void onFailure(final Throwable t, final boolean willReconnect) {
                    System.err.printf(Locale.ROOT, "[SSE/%d] failure: %s willReconnect=%s%n",
                            idx, t, willReconnect);
                    if (!willReconnect) {
                        done.countDown();
                    }
                }
            };

            sources[i] = exec.open(uri, headers, listener, cfg, SseParser.BYTE, scheduler, callbacks);
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            for (final EventSource es : sources) {
                if (es != null) {
                    es.cancel();
                }
            }
            try {
                exec.close();
            } catch (final Exception ignore) {
            }
            scheduler.shutdownNow();
        }, "sse-shutdown"));

        for (final EventSource es : sources) {
            es.start();
        }

        done.await();

        for (final EventSource es : sources) {
            es.cancel();
        }
        exec.close();
        scheduler.shutdownNow();
    }
}