SseSubscriberTest.java

/*
 * Copyright (c) 2020, 2022 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.media.sse.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.inject.Singleton;
import javax.json.Json;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.SseEventSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.reactivex.Flowable;
import org.glassfish.jersey.internal.jsr166.Flow;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.jupiter.api.Test;

/**
 * @author Daniel Kec
 */
public class SseSubscriberTest extends JerseyTest {

    private static final int NUMBER_OF_TEST_MESSAGES = 5;
    private static final String TEST_MESSAGE = "Jersey";
    private static final JsonBuilderFactory JSON_BUILDER = Json.createBuilderFactory(Collections.emptyMap());

    @Override
    protected Application configure() {
        return new ResourceConfig(SseEndpoint.class);
    }

    @Singleton
    @Path("sse")
    public static class SseEndpoint {

        @GET
        @Path("short")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseShort(@Context Flow.Subscriber<Short> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(Long::shortValue)
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("double")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseDouble(@Context Flow.Subscriber<Double> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(Long::doubleValue)
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("byte")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseByte(@Context Flow.Subscriber<Byte> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(Long::byteValue)
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("integer")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseInteger(@Context Flow.Subscriber<Integer> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(Long::intValue)
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("long")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseLong(@Context Flow.Subscriber<Long> subscriber) {
            Flowable.just(0L, 1L, 2L, 3L, 4L)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("string")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseString(@Context Flow.Subscriber<String> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(l -> TEST_MESSAGE + l)
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("boolean")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseBoolean(@Context Flow.Subscriber<Boolean> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(l -> (l % 2) == 0)
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("char")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseChar(@Context Flow.Subscriber<Character> subscriber) {
            Flowable.just("FRANK")
                    .flatMap(s -> Flowable.fromArray(s.chars().mapToObj(ch -> (char) ch).toArray(Character[]::new)))
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("json-obj")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseJsonObj(@Context Flow.Subscriber<JsonObject> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(l -> JSON_BUILDER.createObjectBuilder()
                            .add("brand", TEST_MESSAGE)
                            .add("model", "Model " + l)
                            .build())
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }

        @GET
        @Path("json")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public void sseJson(@Context Flow.Subscriber<Car> subscriber) {
            Flowable.interval(20, TimeUnit.MILLISECONDS)
                    .take(NUMBER_OF_TEST_MESSAGES)
                    .map(l -> new Car(TEST_MESSAGE, "Model " + l))
                    .subscribe(JerseyFlowAdapters.toSubscriber(subscriber));
        }
    }


    @Test
    public void testShort() throws InterruptedException {
        assertEquals(Arrays.asList((short) 0, (short) 1, (short) 2, (short) 3, (short) 4), receive(Short.class, "sse/short"));
    }

    @Test
    public void testDouble() throws InterruptedException {
        assertEquals(Arrays.asList(0.0, 1.0, 2.0, 3.0, 4.0), receive(Double.class, "sse/double"));
    }

    @Test
    public void testByte() throws InterruptedException {
        assertEquals(Arrays.asList((byte) 0, (byte) 1, (byte) 2, (byte) 3, (byte) 4), receive(Byte.class, "sse/byte"));
    }

    @Test
    public void testInteger() throws InterruptedException {
        assertEquals(Arrays.asList(0, 1, 2, 3, 4), receive(Integer.class, "sse/integer"));
    }

    @Test
    public void testBoolean() throws InterruptedException {
        assertEquals(Arrays.asList(true, false, true, false, true), receive(Boolean.class, "sse/boolean"));
    }

    @Test
    public void testLong() throws InterruptedException {
        assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), receive(Long.class, "sse/long"));
    }

    @Test
    public void testString() throws InterruptedException {
        assertEquals(Arrays.asList(TEST_MESSAGE + 0, TEST_MESSAGE + 1, TEST_MESSAGE + 2, TEST_MESSAGE + 3, TEST_MESSAGE + 4),
                receive(String.class, "sse/string"));
    }

    @Test
    public void testChar() throws InterruptedException {
        assertEquals(Arrays.asList('F', 'R', 'A', 'N', 'K'),
                receive(Character.class, "sse/char"));
    }

    @Test
    public void testJsonObj() throws InterruptedException {
        Jsonb jsonb = JsonbBuilder.create();
        assertEquals(Arrays.asList(
                new Car(TEST_MESSAGE, "Model 0"),
                new Car(TEST_MESSAGE, "Model 1"),
                new Car(TEST_MESSAGE, "Model 2"),
                new Car(TEST_MESSAGE, "Model 3"),
                new Car(TEST_MESSAGE, "Model 4")
                ),
                receive(String.class, "sse/json-obj")
                        .stream()
                        .map(s -> jsonb.fromJson(s, Car.class))
                        .collect(Collectors.toList()));
    }

    @Test
    public void testJson() throws InterruptedException {
        Jsonb jsonb = JsonbBuilder.create();
        assertEquals(Arrays.asList(
                new Car(TEST_MESSAGE, "Model 0"),
                new Car(TEST_MESSAGE, "Model 1"),
                new Car(TEST_MESSAGE, "Model 2"),
                new Car(TEST_MESSAGE, "Model 3"),
                new Car(TEST_MESSAGE, "Model 4")
                ),
                receive(String.class, "sse/json")
                        .stream()
                        .map(s -> jsonb.fromJson(s, Car.class))
                        .collect(Collectors.toList()));
    }

    private <T> List<T> receive(Class<T> type, String path) throws InterruptedException {
        WebTarget sseTarget = target(path);

        ArrayList<T> result = new ArrayList<>(NUMBER_OF_TEST_MESSAGES);

        final CountDownLatch eventLatch = new CountDownLatch(NUMBER_OF_TEST_MESSAGES);
        SseEventSource eventSource = SseEventSource.target(sseTarget).build();
        eventSource.register((event) -> {
            System.out.println("### Client received: " + event);
            result.add(event.readData(type));
            eventLatch.countDown();
        });
        eventSource.open();

        // client waiting for confirmation that resource method ended.
        assertTrue(eventLatch.await(2, TimeUnit.SECONDS));
        return result;
    }

    public static class Car {
        private String brand;
        private String model;

        public Car() {
        }

        public Car(final String brand, final String model) {
            this.brand = brand;
            this.model = model;
        }

        public String getBrand() {
            return brand;
        }

        public void setBrand(final String brand) {
            this.brand = brand;
        }

        public String getModel() {
            return model;
        }

        public void setModel(final String model) {
            this.model = model;
        }

        @Override
        public String toString() {
            return "Car{brand='" + brand + "', model='" + model + "'}";
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Car car = (Car) o;
            return Objects.equals(brand, car.brand)
                    && Objects.equals(model, car.model);
        }

        @Override
        public int hashCode() {
            return Objects.hash(brand, model);
        }
    }
}