SSETest.java

/*
 * Copyright (c) 2020, 2022 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.tests.integration.jersey4507;

import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientLifecycleListener;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.examples.sse.jersey.App;
import org.glassfish.jersey.examples.sse.jersey.DomainResource;
import org.glassfish.jersey.examples.sse.jersey.ServerSentEventsResource;
import org.glassfish.jersey.media.sse.EventInput;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;

public class SSETest extends JerseyTest {
    private static final int MAX_CLIENTS = 10;
    private static final int COUNT = 30;
    private static final AtomicInteger atomicInteger = new AtomicInteger(0);
    private static final CountDownLatch closeLatch = new CountDownLatch(COUNT);

    @Override
    protected Application configure() {
        // enable(TestProperties.LOG_TRAFFIC);
        return new ResourceConfig(ServerSentEventsResource.class, DomainResource.class, SseFeature.class);
    }

    @Override
    protected void configureClient(ClientConfig config) {
        config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, MAX_CLIENTS + 2);
        config.register(new ClientRuntimeCloseVerifier());
    }

    /**
     * Test consuming multiple SSE events sequentially using event input.
     *
     * @throws Exception in case of a failure during the test execution.
     */
    public void testInboundEventReader() throws Exception {
        final int MAX_MESSAGES = 5;
        final CountDownLatch startLatch = new CountDownLatch(1);

        final ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            final Future<List<String>> futureMessages =
                    executor.submit(new Callable<List<String>>() {

                        @Override
                        public List<String> call() throws Exception {
                            final EventInput eventInput = target(App.ROOT_PATH).register(SseFeature.class)
                                    .request().get(EventInput.class);

                            startLatch.countDown();

                            final List<String> messages = new ArrayList<String>(MAX_MESSAGES);
                            try {
                                for (int i = 0; i < MAX_MESSAGES; i++) {
                                    InboundEvent event = eventInput.read();
                                    messages.add(event.readData());
                                }
                            } finally {
                                if (eventInput != null) {
                                    eventInput.close();
                                }
                            }

                            return messages;
                        }
                    });

            Assertions.assertTrue(startLatch.await(15000, TimeUnit.SECONDS),
                    "Waiting for receiver thread to start has timed out.");

            for (int i = 0; i < MAX_MESSAGES; i++) {
                target(App.ROOT_PATH).request().post(Entity.text("message " + i));
            }

            int i = 0;
            for (String message : futureMessages.get(50, TimeUnit.SECONDS)) {
                MatcherAssert.assertThat("Unexpected SSE event data value.", message, equalTo("message " + i++));
            }
        } finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testInboundEventReaderMultiple() throws Exception {
        for (int i = 0; i != COUNT; i++) {
            testInboundEventReader();
        }

        System.gc();
        closeLatch.await(15_000, TimeUnit.MILLISECONDS);
        // One ClientConfig is on the Client
        // + COUNT of them is created by .register(SseFeature.class)
        Assertions.assertEquals(COUNT + 1, atomicInteger.get());
        Assertions.assertEquals(0, closeLatch.getCount());
    }



    public static class ClientRuntimeCloseVerifier implements ClientLifecycleListener {

        @Override
        public void onInit() {
            atomicInteger.incrementAndGet();
        }

        @Override
        public void onClose() {
            closeLatch.countDown();
        }
    }
}