DefaultEventSourceIntegrationTest.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.sse;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.sse.impl.SseParser;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorStatus;
import org.apache.hc.core5.util.TimeValue;
import org.junit.jupiter.api.Test;

final class DefaultEventSourceIntegrationTest {

    @Test
    void openStreamReceivesEventAndCloses() throws Exception {
        final CapturingClient fake = new CapturingClient();
        final SseExecutor exec = SseExecutor.newInstance(fake);

        final CountDownLatch opened = new CountDownLatch(1);
        final CountDownLatch got = new CountDownLatch(1);
        final CountDownLatch closed = new CountDownLatch(1);

        final EventSourceListener listener = new EventSourceListener() {
            @Override
            public void onOpen() {
                opened.countDown();
            }

            @Override
            public void onEvent(final String id, final String type, final String data) {
                if ("1".equals(id) && "ping".equals(type) && "hello".equals(data)) {
                    got.countDown();
                }
            }

            @Override
            public void onClosed() {
                closed.countDown();
            }
        };

        final EventSource es = exec.open(
                new URI("http://example.org/sse"),
                Collections.emptyMap(),
                listener,
                EventSourceConfig.DEFAULT,
                SseParser.BYTE,
                null,
                null);

        es.start();

        final AsyncResponseConsumer<Void> c = fake.lastConsumer;
        assertNotNull(c, "consumer captured");

        final HttpContext context = HttpClientContext.create();
        c.consumeResponse(
                new BasicHttpResponse(HttpStatus.SC_OK, "OK"),
                new TestEntityDetails("text/event-stream"),
                context,
                new FutureCallback<Void>() {
                    @Override
                    public void completed(final Void result) {
                    }

                    @Override
                    public void failed(final Exception ex) {
                    }

                    @Override
                    public void cancelled() {
                    }
                });

        c.consume(ByteBuffer.wrap("id: 1\nevent: ping\n".getBytes(StandardCharsets.UTF_8)));
        c.consume(ByteBuffer.wrap("data: hello\n\n".getBytes(StandardCharsets.UTF_8)));
        c.streamEnd(null);

        assertTrue(opened.await(1, TimeUnit.SECONDS), "opened");
        assertTrue(got.await(1, TimeUnit.SECONDS), "event received");

        es.cancel();
        assertTrue(closed.await(1, TimeUnit.SECONDS), "closed");

        exec.close();
    }

    // ---- fake async client that captures the consumer & callback via doExecute() ----
    static final class CapturingClient extends CloseableHttpAsyncClient {
        volatile AsyncResponseConsumer<Void> lastConsumer;
        volatile FutureCallback<Void> lastCallback;
        volatile boolean closed;

        @Override
        public void start() { /* no-op */ }

        @Override
        public IOReactorStatus getStatus() {
            return closed ? IOReactorStatus.SHUT_DOWN : IOReactorStatus.ACTIVE;
        }

        @Override
        public void awaitShutdown(final TimeValue waitTime) throws InterruptedException { /* no-op */ }

        @Override
        public void initiateShutdown() { /* no-op */ }

        @Override
        public void close(final CloseMode closeMode) {
            closed = true;
        }

        @Override
        public void close() {
            closed = true;
        }

        @Override
        protected <T> Future<T> doExecute(
                final HttpHost target,
                final AsyncRequestProducer requestProducer,
                final AsyncResponseConsumer<T> responseConsumer,
                final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
                final HttpContext context,
                final FutureCallback<T> callback) {

            @SuppressWarnings("unchecked") final AsyncResponseConsumer<Void> c = (AsyncResponseConsumer<Void>) responseConsumer;
            this.lastConsumer = c;

            @SuppressWarnings("unchecked") final FutureCallback<Void> cb = (FutureCallback<Void>) callback;
            this.lastCallback = cb;

            return new CompletableFuture<>(); // never completed; fine for this test
        }

        @Override
        @Deprecated
        public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
            // deprecated; not used
        }
    }

    // Minimal EntityDetails stub
    static final class TestEntityDetails implements EntityDetails {
        private final String ct;

        TestEntityDetails(final String ct) {
            this.ct = ct;
        }

        @Override
        public long getContentLength() {
            return -1;
        }

        @Override
        public String getContentType() {
            return ct;
        }

        @Override
        public String getContentEncoding() {
            return null;
        }

        @Override
        public boolean isChunked() {
            return true;
        }

        @Override
        public Set<String> getTrailerNames() {
            return Collections.<String>emptySet();
        }
    }
}