FutureCancelTest.java
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.tests.e2e.client.connector;
import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
import org.glassfish.jersey.apache5.connector.Apache5ConnectorProvider;
import org.glassfish.jersey.client.AbstractRxInvoker;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.HttpUrlConnectorProvider;
import org.glassfish.jersey.client.JerseyInvocation;
import org.glassfish.jersey.client.RequestEntityProcessing;
import org.glassfish.jersey.client.spi.ConnectorProvider;
import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
import org.glassfish.jersey.server.ChunkedOutput;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.CompletionStageRxInvoker;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Supplier;
public class FutureCancelTest extends JerseyTest {
public static final long SLEEP = 100L;
public static List<ConnectorProvider> testData() {
return Arrays.asList(
new ApacheConnectorProvider(),
new Apache5ConnectorProvider(),
new HttpUrlConnectorProvider(),
new NettyConnectorProvider()
);
}
@Path("/")
public static class FutureCancelResource {
@GET
public ChunkedOutput<String> sendData() {
ChunkedOutput<String> chunkedOutput = new ChunkedOutput<>(String.class);
Thread newThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i != 100; i++) {
try {
chunkedOutput.write(String.valueOf(i));
Thread.sleep(SLEEP);
} catch (Exception e) {
// consume
}
}
}
});
newThread.start();
return chunkedOutput;
}
}
@Override
protected Application configure() {
return new ResourceConfig(FutureCancelResource.class);
}
@ParameterizedTest
@MethodSource("testData")
public void testFutureCancel(ConnectorProvider connectorProvider) throws InterruptedException, ExecutionException {
ClientConfig config = new ClientConfig();
config.connectorProvider(connectorProvider);
Future<List<String>> future = ClientBuilder.newClient(config)
.register(new FutureCancelRxInvokerProvider())
.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED)
.target(target().getUri()).request().rx(FutureCancelRxInvoker.class).get().toCompletableFuture();
int expectedSize = 2;
while (RX_LIST.size() < expectedSize) {
Thread.sleep(SLEEP);
}
future.cancel(true);
Thread.sleep(2 * SLEEP); // wait to see no new messages arrive
int size = RX_LIST.size(); // some might have beween RX_LIST.size() and cancel()
while (size > expectedSize) { // be sure no more come
Thread.sleep(SLEEP);
expectedSize = size;
size = RX_LIST.size();
}
Assertions.assertTrue(size < 10, "Received " + size + " messages");
}
private static List<String> RX_LIST = new LinkedList<>();
public static class FutureCancelRxInvokerProvider implements RxInvokerProvider<FutureCancelRxInvoker> {
Function<InputStream, Object> function = new Function<InputStream, Object>() {
@Override
public Object apply(InputStream inputStream) {
byte[] number = new byte[8];
int len = 0;
do {
try {
if ((len = inputStream.read(number)) != -1) {
RX_LIST.add(new String(number).substring(0, len));
} else {
break;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} while (true);
return RX_LIST;
}
};
@Override
public boolean isProviderFor(Class<?> clazz) {
return FutureCancelRxInvoker.class.equals(clazz);
}
@Override
public FutureCancelRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
return new FutureCancelRxInvoker(syncInvoker, executorService, function);
}
}
private static class FutureCancelRxInvoker extends AbstractRxInvoker<CompletionStage> implements CompletionStageRxInvoker {
private final Function<InputStream, Object> consumer;
public FutureCancelRxInvoker(SyncInvoker syncInvoker, ExecutorService executor, Function<InputStream, Object> consumer) {
super(syncInvoker, executor);
this.consumer = consumer;
}
@Override
public <R> CompletionStage method(String name, Entity<?> entity, Class<R> responseType) {
CompletableFuture<R> completableFuture = CompletableFuture.supplyAsync(new Supplier<R>() {
@Override
public R get() {
Response r = getSyncInvoker().get();
InputStream is = r.readEntity(InputStream.class);
Object o = consumer.apply(is);
return (R) o;
}
}, getExecutorService());
((JerseyInvocation.Builder) getSyncInvoker()).setCancellable(completableFuture);
return completableFuture;
}
@Override
public <R> CompletionStage method(String name, Entity<?> entity, GenericType<R> responseType) {
CompletableFuture<R> completableFuture = CompletableFuture.supplyAsync(new Supplier<R>() {
@Override
public R get() {
Response r = getSyncInvoker().get();
InputStream is = r.readEntity(InputStream.class);
Object o = consumer.apply(is);
return (R) o;
}
}, getExecutorService());
return completableFuture;
}
}
}