JerseyPublisherTest.java

/*
 * Copyright (c) 2017, 2022 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.internal.util;


import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.glassfish.jersey.internal.jsr166.Flow;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
 * Test Jersey {@link Flow.Publisher} implementation, {@link JerseyPublisher}.
 *
 * @author Adam Lindenthal
 */
public class JerseyPublisherTest {

    @Test
    public void test() throws InterruptedException {

        final CountDownLatch openLatch1 = new CountDownLatch(1);
        final CountDownLatch openLatch2 = new CountDownLatch(1);
        final CountDownLatch openLatch3 = new CountDownLatch(1);

        final CountDownLatch writeLatch1 = new CountDownLatch(3);
        final CountDownLatch writeLatch2 = new CountDownLatch(2);
        final CountDownLatch writeLatch3 = new CountDownLatch(1);

        final CountDownLatch closeLatch = new CountDownLatch(3);

        final JerseyPublisher<String> publisher = new JerseyPublisher<>(JerseyPublisher.PublisherStrategy.BLOCKING);
        final PublisherTestSubscriber subscriber1 =
                new PublisherTestSubscriber("SUBSCRIBER-1", openLatch1, writeLatch1, closeLatch);
        final PublisherTestSubscriber subscriber2 =
                new PublisherTestSubscriber("SUBSCRIBER-2", openLatch2, writeLatch2, closeLatch);
        final PublisherTestSubscriber subscriber3 =
                new PublisherTestSubscriber("SUBSCRIBER-3", openLatch3, writeLatch3, closeLatch);

        publisher.publish("START");  // sent before any subscriber subscribed - should not be received

        publisher.subscribe(subscriber1);
        publisher.publish("Zero");   // before receive, but should be received by SUBSCRIBER-1
        assertTrue(openLatch1.await(200, TimeUnit.MILLISECONDS));

        subscriber1.receive(3);
        publisher.publish("One");    // should be received by SUBSCRIBER-1

        publisher.subscribe(subscriber2);
        assertTrue(openLatch2.await(200, TimeUnit.MILLISECONDS));
        subscriber2.receive(5);

        publisher.publish("Two");    // should be received by SUBSCRIBER-1 and SUBSCRIBER-2

        publisher.subscribe(subscriber3);
        assertTrue(openLatch3.await(200, TimeUnit.MILLISECONDS));
        subscriber3.receive(5);

        publisher.publish("Three");  // should be received by SUBSCRIBER-2 and SUBSCRIBER-3

        assertTrue(writeLatch1.await(1000, TimeUnit.MILLISECONDS));
        assertTrue(writeLatch2.await(1000, TimeUnit.MILLISECONDS));
        assertTrue(writeLatch3.await(1000, TimeUnit.MILLISECONDS));

        Queue<String> result = subscriber1.getReceivedData();
        assertEquals(3, result.size());
        assertEquals("Zero", result.poll());
        assertEquals("One", result.poll());
        assertEquals("Two", result.poll());

        result = subscriber2.getReceivedData();
        assertEquals(2, result.size());
        assertEquals("Two", result.poll());
        assertEquals("Three", result.poll());

        result = subscriber3.getReceivedData();
        assertEquals(1, result.size());
        assertEquals("Three", result.poll());

        publisher.close();
        subscriber1.receive(1);     // --> with this, the CDL is successfully counted down and await returns true
        assertTrue(closeLatch.await(10000, TimeUnit.MILLISECONDS));
        // this behaviour is a little counter-intuitive, but confirmed as correct by Flow.SubmissionPublisher author,
        // Dough Lea on the JDK mailing list
    }

    @Test
    public void testNonBlocking() throws InterruptedException {
        final int MSG_COUNT = 300;
        final int DATA_COUNT = Flow.defaultBufferSize() + 2;
        final int WAIT_TIME = 20 * DATA_COUNT;

        final JerseyPublisher<String> publisher = new JerseyPublisher<>();

        final CountDownLatch openLatchActive = new CountDownLatch(1);
        final CountDownLatch writeLatch = new CountDownLatch(DATA_COUNT);
        final CountDownLatch closeLatch = new CountDownLatch(1);

        final CountDownLatch openLatchDead = new CountDownLatch(1);

        final PublisherTestSubscriber deadSubscriber =
                new PublisherTestSubscriber("dead", openLatchDead, new CountDownLatch(0), new CountDownLatch(0));

        final PublisherTestSubscriber activeSubscriber =
                new PublisherTestSubscriber("active", openLatchActive, writeLatch, closeLatch);

        // subscribe to publisher
        publisher.subscribe(deadSubscriber);
        assertTrue(openLatchDead.await(200, TimeUnit.MILLISECONDS));
        publisher.subscribe(activeSubscriber);
        assertTrue(openLatchActive.await(200, TimeUnit.MILLISECONDS));

        activeSubscriber.receive(1000);

        AtomicInteger counter = new AtomicInteger(0);
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            int i = counter.getAndIncrement();

            if (i >= MSG_COUNT) {
                scheduledExecutorService.shutdown();
                return;
            }

            publisher.publish("MSG-" + i);
        }, 0, 10, TimeUnit.MILLISECONDS);

        writeLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
        assertTrue(writeLatch.getCount() <= 1);

        assertTrue(DATA_COUNT - activeSubscriber.getReceivedData().size() <= 1);
        assertEquals(0, deadSubscriber.getReceivedData().size());

        assertFalse(activeSubscriber.hasError());
        assertTrue(deadSubscriber.hasError());

        publisher.close();

        assertTrue(closeLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS));
        assertTrue(activeSubscriber.isCompleted());
        assertFalse(deadSubscriber.isCompleted());
    }

    class PublisherTestSubscriber implements Flow.Subscriber<String> {

        private final String name;
        private final CountDownLatch openLatch;
        private final CountDownLatch writeLatch;
        private final CountDownLatch closeLatch;
        private Flow.Subscription subscription;
        private final Queue<String> data;
        private boolean hasError = false;
        private boolean completed = false;

        PublisherTestSubscriber(final String name,
                                final CountDownLatch openLatch,
                                final CountDownLatch writeLatch,
                                final CountDownLatch closeLatch) {
            this.name = name;
            this.openLatch = openLatch;
            this.writeLatch = writeLatch;
            this.closeLatch = closeLatch;
            this.data = new ConcurrentLinkedQueue<>();
        }

        @Override
        public void onSubscribe(final Flow.Subscription subscription) {
            this.subscription = subscription;
            openLatch.countDown();
        }

        @Override
        public void onNext(final String item) {
            data.add(item);
            writeLatch.countDown();
        }

        @Override
        public void onError(final Throwable throwable) {
            throwable.printStackTrace();
            hasError = true;
        }

        @Override
        public void onComplete() {
            completed = true;
            closeLatch.countDown();
        }

        @Override
        public String toString() {
            return this.name + " " + Thread.currentThread().getName();
        }

        public void receive(final long n) {
            if (subscription != null) {
                subscription.request(n);
            }
        }

        /**
         * Retrieve stored (received) data for assertions.
         *
         * @return all the data received by subscriber.
         */
        Queue<String> getReceivedData() {
            return this.data;
        }

        boolean hasError() {
            return hasError;
        }

        boolean isCompleted() {
            return completed;
        }
    }
}