ItemStoreResourceITCase.java

/*
 * Copyright (c) 2013, 2022 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.tests.integration.servlet_3_sse_1;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Form;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;

import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.grizzly.connector.GrizzlyConnectorProvider;
import org.glassfish.jersey.media.sse.EventListener;
import org.glassfish.jersey.media.sse.EventSource;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.test.JerseyTest;
import org.glassfish.jersey.test.external.ExternalTestContainerFactory;
import org.glassfish.jersey.test.spi.TestContainerException;
import org.glassfish.jersey.test.spi.TestContainerFactory;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.describedAs;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
 * Item store test.
 *
 * @author Marek Potociar
 */
@Disabled
public class ItemStoreResourceITCase extends JerseyTest {

    private static final Logger LOGGER = Logger.getLogger(ItemStoreResourceITCase.class.getName());
    private static final int MAX_LISTENERS = 5;
    private static final int MAX_ITEMS = 10;


    @Override
    protected Application configure() {
        return new ItemStoreApp();
    }

    @Override
    protected TestContainerFactory getTestContainerFactory() throws TestContainerException {
        return new ExternalTestContainerFactory();
    }

    @Override
    protected void configureClient(ClientConfig config) {
        config.property(ClientProperties.CONNECT_TIMEOUT, 15000)
                .property(ClientProperties.READ_TIMEOUT, 2000)
                .property(ClientProperties.ASYNC_THREADPOOL_SIZE, MAX_LISTENERS + 1)
                .connectorProvider(new GrizzlyConnectorProvider());
    }

    @Override
    protected URI getBaseUri() {
        final UriBuilder baseUriBuilder = UriBuilder.fromUri(super.getBaseUri());
        final boolean externalFactoryInUse = getTestContainerFactory() instanceof ExternalTestContainerFactory;
        return externalFactoryInUse ? baseUriBuilder.path("resources").build() : baseUriBuilder.build();
    }

    /**
     * Test the item addition, addition event broadcasting and item retrieval from {@link ItemStoreResource}.
     *
     * @throws Exception in case of a test failure.
     */
    @Test
    public void testItemsStore() throws Exception {
        final List<String> items = Collections.unmodifiableList(Arrays.asList(
                "foo",
                "bar",
                "baz"));
        final WebTarget itemsTarget = target("items");
        final CountDownLatch latch = new CountDownLatch(items.size() * MAX_LISTENERS * 2); // countdown on all events
        final List<Queue<Integer>> indexQueues = new ArrayList<Queue<Integer>>(MAX_LISTENERS);
        final EventSource[] sources = new EventSource[MAX_LISTENERS];
        final AtomicInteger sizeEventsCount = new AtomicInteger(0);

        for (int i = 0; i < MAX_LISTENERS; i++) {
            final int id = i;
            final EventSource es = EventSource.target(itemsTarget.path("events"))
                    .named("SOURCE " + id).build();
            sources[id] = es;

            final Queue<Integer> indexes = new ConcurrentLinkedQueue<Integer>();
            indexQueues.add(indexes);

            es.register(new EventListener() {
                @Override
                @SuppressWarnings("MagicNumber")
                public void onEvent(InboundEvent inboundEvent) {
                    try {
                        if (inboundEvent.getName() == null) {
                            final String data = inboundEvent.readData();
                            LOGGER.info("[-i-] SOURCE " + id + ": Received event id=" + inboundEvent.getId() + " data=" + data);
                            indexes.add(items.indexOf(data));
                        } else if ("size".equals(inboundEvent.getName())) {
                            sizeEventsCount.incrementAndGet();
                        }
                    } catch (ProcessingException ex) {
                        LOGGER.log(Level.SEVERE, "[-x-] SOURCE " + id + ": Error getting event data.", ex);
                        indexes.add(-999);
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }

        try {
            open(sources);

            for (String item : items) {
                postItem(itemsTarget, item);
            }

            assertTrue(latch.await((1000 + MAX_LISTENERS * EventSource.RECONNECT_DEFAULT) * getAsyncTimeoutMultiplier(),
                            TimeUnit.MILLISECONDS),
                    "Waiting to receive all events has timed out.");

            // need to force disconnect on server in order for EventSource.close(...) to succeed with HttpUrlConnection
            sendCommand(itemsTarget, "disconnect");
        } finally {
            close(sources);
        }

        String postedItems = itemsTarget.request().get(String.class);
        for (String item : items) {
            assertTrue(postedItems.contains(item), "Item '" + item + "' not stored on server.");
        }

        int queueId = 0;
        for (Queue<Integer> indexes : indexQueues) {
            for (int i = 0; i < items.size(); i++) {
                assertTrue(indexes.contains(i), "Event for '" + items.get(i) + "' not received in queue " + queueId);
            }
            assertEquals(items.size(), indexes.size(), "Not received the expected number of events in queue " + queueId);
            queueId++;
        }

        assertEquals(items.size() * MAX_LISTENERS, sizeEventsCount.get(), "Number of received 'size' events does not match.");
    }

    /**
     * Test the {@link EventSource} reconnect feature.
     *
     * @throws Exception in case of a test failure.
     */
    @Test
    public void testEventSourceReconnect() throws Exception {
        final WebTarget itemsTarget = target("items");
        final CountDownLatch latch = new CountDownLatch(MAX_ITEMS * MAX_LISTENERS * 2); // countdown only on new item events
        final List<Queue<String>> receivedQueues = new ArrayList<Queue<String>>(MAX_LISTENERS);
        final EventSource[] sources = new EventSource[MAX_LISTENERS];

        for (int i = 0; i < MAX_LISTENERS; i++) {
            final int id = i;
            final EventSource es = EventSource.target(itemsTarget.path("events")).named("SOURCE " + id).build();
            sources[id] = es;

            final Queue<String> received = new ConcurrentLinkedQueue<String>();
            receivedQueues.add(received);

            es.register(new EventListener() {
                @Override
                public void onEvent(InboundEvent inboundEvent) {
                    try {
                        if (inboundEvent.getName() == null) {
                            latch.countDown();
                            final String data = inboundEvent.readData();
                            LOGGER.info("[-i-] SOURCE " + id + ": Received event id=" + inboundEvent.getId() + " data=" + data);
                            received.add(data);
                        }
                    } catch (ProcessingException ex) {
                        LOGGER.log(Level.SEVERE, "[-x-] SOURCE " + id + ": Error getting event data.", ex);
                        received.add("[data processing error]");
                    }
                }
            });
        }

        final String[] postedItems = new String[MAX_ITEMS * 2];
        try {
            open(sources);

            for (int i = 0; i < MAX_ITEMS; i++) {
                final String item = String.format("round-1-%02d", i);
                postItem(itemsTarget, item);
                postedItems[i] = item;
                sendCommand(itemsTarget, "disconnect");
                Thread.sleep(100);
            }

            final int reconnectDelay = 1;
            sendCommand(itemsTarget, "reconnect " + reconnectDelay);
            sendCommand(itemsTarget, "disconnect");

            Thread.sleep(reconnectDelay * 1000);

            for (int i = 0; i < MAX_ITEMS; i++) {
                final String item = String.format("round-2-%02d", i);
                postedItems[i + MAX_ITEMS] = item;
                postItem(itemsTarget, item);
            }

            sendCommand(itemsTarget, "reconnect now");

            assertTrue(latch.await((1 + MAX_LISTENERS * (MAX_ITEMS + 1) * reconnectDelay) * getAsyncTimeoutMultiplier(),
                    TimeUnit.SECONDS), "Waiting to receive all events has timed out.");

            // need to force disconnect on server in order for EventSource.close(...) to succeed with HttpUrlConnection
            sendCommand(itemsTarget, "disconnect");
        } finally {
            close(sources);
        }

        final String storedItems = itemsTarget.request().get(String.class);
        for (String item : postedItems) {
            assertThat("Posted item '" + item + "' stored on server", storedItems, containsString(item));
        }

        int sourceId = 0;
        for (Queue<String> queue : receivedQueues) {
            assertThat("Received events in source " + sourceId, queue,
                    describedAs("Collection containing %0", hasItems(postedItems), Arrays.asList(postedItems).toString()));
            assertThat("Size of received queue for source " + sourceId, queue.size(), equalTo(postedItems.length));
            sourceId++;
        }
    }

    private static void postItem(final WebTarget itemsTarget, final String item) {
        final Response response = itemsTarget.request().post(Entity.form(new Form("name", item)));
        assertEquals(204, response.getStatus(), "Posting new item has failed.");
        LOGGER.info("[-i-] POSTed item: '" + item + "'");
    }

    private static void open(final EventSource[] sources) {
        int i = 0;
        for (EventSource source : sources) {
            source.open();
            LOGGER.info("[-->] SOURCE " + i++ + " opened.");
        }
    }

    private static void close(final EventSource[] sources) {
        int i = 0;
        for (EventSource source : sources) {
            if (source.isOpen()) {
                assertTrue(source.close(1, TimeUnit.SECONDS), "Waiting to close a source has timed out.");
//                    source.close(100, TimeUnit.MILLISECONDS);
                LOGGER.info("[<--] SOURCE " + i++ + " closed.");
            }
        }
    }

    private static void sendCommand(final WebTarget itemsTarget, final String command) {
        final Response response = itemsTarget.path("commands").request().post(Entity.text(command));
        assertEquals(200, response.getStatus(), "'" + command + "' command has failed.");
        LOGGER.info("[-!-] COMMAND '" + command + "' has been processed.");
    }
}