BroadcasterTest.java
/*
* Copyright (c) 2017, 2022 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.tests.e2e.sse;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import javax.inject.Singleton;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseEventSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* JAX-RS {@link javax.ws.rs.sse.SseBroadcaster} test.
*
* @author Adam Lindenthal
*/
public class BroadcasterTest extends JerseyTest {
static final CountDownLatch closeLatch = new CountDownLatch(4);
static final CountDownLatch txLatch = new CountDownLatch(4);
private static boolean isSingleton = false;
private static int ASYNC_WAIT_TIMEOUT = 1000; //timeout for asynchronous events to complete activities
@Path("sse")
@Singleton
public static class SseResource {
private final Sse sse;
private SseBroadcaster broadcaster;
private OutboundSseEvent.Builder builder;
public SseResource(@Context final Sse sse) {
this.sse = sse;
broadcaster = sse.newBroadcaster();
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("events")
public void getServerSentEvents(@Context final SseEventSink eventSink, @Context final Sse sse) {
isSingleton = this.sse == sse;
builder = sse.newEventBuilder();
eventSink.send(builder.data("Event1").build());
eventSink.send(builder.data("Event2").build());
eventSink.send(builder.data("Event3").build());
broadcaster.register(eventSink);
broadcaster.onClose((subscriber) -> {
if (subscriber == eventSink) {
closeLatch.countDown();
}
});
txLatch.countDown();
}
@Path("push/{msg}")
@Produces(MediaType.SERVER_SENT_EVENTS)
@GET
public String pushMessage(@PathParam("msg") final String msg) {
broadcaster.broadcast(builder.data(msg).build());
txLatch.countDown();
return "Broadcasting message: " + msg;
}
@Path("close")
@GET
public String close() {
broadcaster.close();
return "Closed.";
}
}
/**
* Wrapper to hold results coming from events (including broadcast)
*
* @param <T> type of expected results
*/
public static class EventListWrapper<T> {
private final List<T> data; //event results
private final CountDownLatch eventCountDown; //count down delay for expected results
private final CountDownLatch broadcastLag = new CountDownLatch(1); //broadcast lag
// which shall be hold until thread is ready to process events from broadcast
private static final int LAG_INTERVAL = 1000; //broadcast lag timeout - in milliseconds (1s)
private static final int EXPECTED_REGULAR_EVENTS_COUNT = 3; //expected regular outbound events
public EventListWrapper(List<T> data, CountDownLatch eventCountDown) {
this.data = data;
this.eventCountDown = eventCountDown;
}
public void add(T msg) {
data.add(msg);
eventCountDown.countDown();
if (eventCountDown.getCount() == EXPECTED_REGULAR_EVENTS_COUNT) { //all regular events are received,
//ready for broadcast
broadcastLag.countDown();
}
}
public CountDownLatch getEventCountDown() {
return eventCountDown;
}
public T get(int pos) {
return data.get(pos);
}
public int size() {
return data.size();
}
/**
* makes current thread to wait for predefined interval until broadcast is ready
*
* @throws InterruptedException in case of something went wrong
*/
public boolean waitBroadcast() throws InterruptedException {
return broadcastLag.await(LAG_INTERVAL, TimeUnit.MILLISECONDS);
}
}
@Override
protected Application configure() {
final ResourceConfig rc = new ResourceConfig(SseResource.class);
rc.property(ServerProperties.WADL_FEATURE_DISABLE, true);
return rc;
}
@Test
public void test() throws InterruptedException {
final SseEventSource eventSourceA = SseEventSource.target(target().path("sse/events")).build();
final EventListWrapper<String> resultsA1 = new EventListWrapper(new ArrayList(), new CountDownLatch(5));
final EventListWrapper<String> resultsA2 = new EventListWrapper(new ArrayList(), new CountDownLatch(5));
eventSourceA.register(event -> resultsA1.add(event.readData()));
eventSourceA.register(event -> resultsA2.add(event.readData()));
eventSourceA.open();
Assertions.assertTrue(resultsA1.waitBroadcast()); //some delay is required to process consumer and producer
Assertions.assertTrue(resultsA2.waitBroadcast()); //some delay is required to process consumer and producer
target().path("sse/push/firstBroadcast").request().get(String.class);
final SseEventSource eventSourceB = SseEventSource.target(target().path("sse/events")).build();
final EventListWrapper<String> resultsB1 = new EventListWrapper(new ArrayList(), new CountDownLatch(4));
final EventListWrapper<String> resultsB2 = new EventListWrapper(new ArrayList(), new CountDownLatch(4));
eventSourceB.register(event -> resultsB1.add(event.readData()));
eventSourceB.register(event -> resultsB2.add(event.readData()));
eventSourceB.open();
Assertions.assertTrue(resultsB1.waitBroadcast()); //some delay is required to process consumer and producer
Assertions.assertTrue(resultsB2.waitBroadcast()); //some delay is required to process consumer and producer
target().path("sse/push/secondBroadcast").request().get(String.class);
Assertions.assertTrue(resultsA1.getEventCountDown().await(ASYNC_WAIT_TIMEOUT, TimeUnit.MILLISECONDS),
"Waiting for resultsA1 to be complete failed.");
Assertions.assertTrue(resultsA2.getEventCountDown().await(ASYNC_WAIT_TIMEOUT, TimeUnit.MILLISECONDS),
"Waiting for resultsA2 to be complete failed.");
Assertions.assertTrue(resultsB1.getEventCountDown().await(ASYNC_WAIT_TIMEOUT, TimeUnit.MILLISECONDS),
"Waiting for resultsB1 to be complete failed.");
Assertions.assertTrue(resultsB2.getEventCountDown().await(ASYNC_WAIT_TIMEOUT, TimeUnit.MILLISECONDS),
"Waiting for resultsB2 to be complete failed.");
Assertions.assertTrue(txLatch.await(5000, TimeUnit.MILLISECONDS));
// Event1, Event2, Event3, firstBroadcast, secondBroadcast
Assertions.assertEquals(5, resultsA1.size(), "resultsA1 does not contain 5 elements.");
Assertions.assertEquals(5, resultsA2.size(), "resultsA2 does not contain 5 elements.");
Assertions.assertTrue(resultsA1.get(0).equals("Event1")
&& resultsA1.get(1).equals("Event2")
&& resultsA1.get(2).equals("Event3")
&& resultsA1.get(3).equals("firstBroadcast")
&& resultsA1.get(4).equals("secondBroadcast"),
"resultsA1 does not contain expected data");
Assertions.assertTrue(resultsA2.get(0).equals("Event1")
&& resultsA2.get(1).equals("Event2")
&& resultsA2.get(2).equals("Event3")
&& resultsA2.get(3).equals("firstBroadcast")
&& resultsA2.get(4).equals("secondBroadcast"),
"resultsA2 does not contain expected data");
Assertions.assertEquals(4, resultsB1.size(), "resultsB1 does not contain 4 elements.");
Assertions.assertEquals(4, resultsB2.size(), "resultsB2 does not contain 4 elements.");
Assertions.assertTrue(resultsB1.get(0).equals("Event1")
&& resultsB1.get(1).equals("Event2")
&& resultsB1.get(2).equals("Event3")
&& resultsB1.get(3).equals("secondBroadcast"),
"resultsB1 does not contain expected data");
Assertions.assertTrue(resultsB2.get(0).equals("Event1")
&& resultsB2.get(1).equals("Event2")
&& resultsB2.get(2).equals("Event3")
&& resultsB2.get(3).equals("secondBroadcast"),
"resultsB2 does not contain expected data");
target().path("sse/close").request().get();
closeLatch.await();
Assertions.assertTrue(isSingleton, "Sse instances injected into resource and constructor differ. "
+ "Sse should have been injected as a singleton");
}
}