ExecutorServiceProviderTest.java
/*
* Copyright (c) 2013, 2022 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.tests.e2e;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import org.glassfish.jersey.client.ClientAsyncExecutor;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler;
import org.glassfish.jersey.server.ManagedAsync;
import org.glassfish.jersey.server.ManagedAsyncExecutor;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.spi.ExecutorServiceProvider;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* {@link org.glassfish.jersey.spi.ExecutorServiceProvider} E2E tests.
*
* @author Marek Potociar
*/
public class ExecutorServiceProviderTest extends JerseyTest {
@Path("resource")
@Produces("text/plain")
public static class TestResourceA {
@GET
public String getSync() {
return "resource";
}
@GET
@Path("async")
@ManagedAsync
public String getAsync() {
return "async-resource-" + ExecutorServiceProviderTest.getResponseOnThread();
}
}
// A separate resource class for the custom-async sub-path to ensure that
// the named ExecutorService injection is only performed for the "/resource/custom-async" path.
@Path("resource")
@Produces("text/plain")
public static class TestResourceB {
@Inject
@Named("custom")
ExecutorService executorService;
@GET
@Path("custom-async")
public void getCustomAsync(@Suspended final AsyncResponse asyncResponse) {
executorService.submit(new Runnable() {
@Override
public void run() {
asyncResponse.resume("custom-async-resource-" + ExecutorServiceProviderTest.getResponseOnThread());
}
});
}
}
static String getResponseOnThread() {
final String threadName = Thread.currentThread().getName();
if (threadName.startsWith("async-request-")) {
return "passed";
} else {
return "error - unexpected custom thread name: " + threadName;
}
}
@ClientAsyncExecutor
@ManagedAsyncExecutor
@Named("custom")
public static class CustomExecutorProvider implements ExecutorServiceProvider {
private final Set<ExecutorService> executors = Collections.newSetFromMap(new IdentityHashMap<>());
private volatile int executorCreationCount = 0;
private volatile int executorReleaseCount = 0;
public void reset() {
for (ExecutorService executor : executors) {
executor.shutdownNow();
}
executors.clear();
executorCreationCount = 0;
executorReleaseCount = 0;
}
@Override
public ExecutorService getExecutorService() {
return new CustomExecutorService();
}
@Override
public void dispose(final ExecutorService executorService) {
executorService.shutdownNow();
}
/* package private */ class CustomExecutorService implements ExecutorService {
private final ExecutorService delegate;
private final AtomicBoolean isCleanedUp;
public CustomExecutorService() {
this(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("async-request-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.build()));
}
public CustomExecutorService(ExecutorService delegate) {
this.isCleanedUp = new AtomicBoolean(false);
this.delegate = delegate;
executorCreationCount++;
executors.add(this);
}
@Override
public void shutdown() {
tryCleanUp();
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
tryCleanUp();
return delegate.shutdownNow();
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}
private void tryCleanUp() {
if (isCleanedUp.compareAndSet(false, true)) {
executors.remove(this);
executorReleaseCount++;
}
}
@Override
public void execute(Runnable command) {
delegate.execute(command);
}
}
}
public static class SecondCustomExecutorProvider extends CustomExecutorProvider {
public static final String NAME_FORMAT = "second-async-request";
public ExecutorService getExecutorService() {
return new CustomExecutorService(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat(NAME_FORMAT + "-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.build()));
}
}
private static final CustomExecutorProvider serverExecutorProvider = new CustomExecutorProvider();
@Override
protected Application configure() {
// enable(TestProperties.LOG_TRAFFIC);
// enable(TestProperties.DUMP_ENTITY);
return new ResourceConfig(TestResourceA.class, TestResourceB.class).register(serverExecutorProvider);
}
/**
* Reproducer for JERSEY-2205 (client-side).
*
* @throws Exception in case of a test error.
*/
@Test
public void testCustomClientExecutorsInjectionAndReleasing() throws Exception {
final CustomExecutorProvider provider = new CustomExecutorProvider();
Client client = ClientBuilder.newClient().register(provider);
Response response = client.target(getBaseUri()).path("resource").request().get();
assertEquals(200, response.getStatus());
assertEquals("resource", response.readEntity(String.class));
// no executors should be created or released at this point yet
assertEquals(0, provider.executorCreationCount, "Unexpected number of created client executors");
assertEquals(0, provider.executorReleaseCount, "Unexpected number of released client executors");
assertEquals(0, provider.executors.size(),
"Unexpected number of client executors stored in the set.");
Future<Response> fr = client.target(getBaseUri()).path("resource").request().async().get();
response = fr.get();
assertEquals(200, response.getStatus());
assertEquals("resource", response.readEntity(String.class));
// single executor should be created but not released at this point yet
assertEquals(1, provider.executorCreationCount, "Unexpected number of created client executors");
assertEquals(0, provider.executorReleaseCount, "Unexpected number of released client executors");
assertEquals(1, provider.executors.size(), "Unexpected number of client executors stored in the set.");
client.close();
// the created executor needs to be released by now; no more executors should be created
assertEquals(1, provider.executorCreationCount, "Unexpected number of created client executors");
assertEquals(1, provider.executorReleaseCount, "Unexpected number of released client executors");
assertEquals(0, provider.executors.size(), "Unexpected number of client executors stored in the set.");
}
/**
* Reproducer for JERSEY-2205 (server-side).
*
* @throws Exception in case of a test error.
*/
@Test
public void testCustomServerExecutorsInjectionAndReleasing() throws Exception {
// reset server executor statistics to avoid data pollution from other test methods
serverExecutorProvider.reset();
Response response = target("resource").request().get();
assertEquals(200, response.getStatus());
assertEquals("resource", response.readEntity(String.class));
// no executors should be created or released at this point yet
assertEquals(0, serverExecutorProvider.executorCreationCount, "Unexpected number of created server executors");
assertEquals(0, serverExecutorProvider.executorReleaseCount, "Unexpected number of released server executors");
assertEquals(0, serverExecutorProvider.executors.size(), "Unexpected number of server executors stored in the set.");
response = target("resource/async").request().get();
assertEquals(200, response.getStatus());
assertEquals("async-resource-passed", response.readEntity(String.class));
// single executor should be created but not released at this point yet
assertEquals(1, serverExecutorProvider.executorCreationCount, "Unexpected number of created server executors");
assertEquals(0, serverExecutorProvider.executorReleaseCount, "Unexpected number of released server executors");
assertEquals(1, serverExecutorProvider.executors.size(), "Unexpected number of server executors stored in the set.");
tearDown(); // stopping test container
// the created executor needs to be released by now; no more executors should be created
assertEquals(1, serverExecutorProvider.executorCreationCount, "Unexpected number of created server executors");
assertEquals(1, serverExecutorProvider.executorReleaseCount, "Unexpected number of released server executors");
assertEquals(0, serverExecutorProvider.executors.size(), "Unexpected number of server executors stored in the set.");
setUp(); // re-starting test container to ensure proper post-test tearDown.
}
/**
* Test named custom executor injection and release mechanism.
*
* @throws Exception in case of a test error.
*/
@Test
public void testCustomNamedServerExecutorsInjectionAndReleasing() throws Exception {
serverExecutorProvider.reset();
Response response = target("resource/custom-async").request().get();
assertEquals(200, response.getStatus());
assertEquals("custom-async-resource-passed", response.readEntity(String.class));
// single executor should be created but not released at this point yet
assertEquals(1, serverExecutorProvider.executorCreationCount, "Unexpected number of created server executors");
assertEquals(0, serverExecutorProvider.executorReleaseCount, "Unexpected number of released server executors");
assertEquals(1, serverExecutorProvider.executors.size(), "Unexpected number of server executors stored in the set.");
tearDown(); // stopping test container
// the created executor needs to be released by now; no more executors should be created
assertEquals(1, serverExecutorProvider.executorCreationCount, "Unexpected number of created server executors");
assertEquals(1, serverExecutorProvider.executorReleaseCount, "Unexpected number of released server executors");
assertEquals(0, serverExecutorProvider.executors.size(), "Unexpected number of server executors stored in the set.");
setUp(); // re-starting test container to ensure proper post-test tearDown.
}
@Test
public void testClientBuilderExecutorServiceTakesPrecedenceOverRegistered() throws Exception {
serverExecutorProvider.reset();
CountDownLatch nameLatch = new CountDownLatch(1);
Set<String> threadName = new HashSet<>(1);
final CustomExecutorProvider executorProvider = new CustomExecutorProvider();
Client client = ClientBuilder.newBuilder().register(new ClientRequestFilter() {
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
threadName.add(Thread.currentThread().getName());
nameLatch.countDown();
}
}).executorService(new SecondCustomExecutorProvider().getExecutorService()).register(executorProvider).build();
client.target(getBaseUri()).path("resource").request().async().get(String.class).get();
assertTrue(nameLatch.await(10, TimeUnit.SECONDS));
assertEquals(threadName.size(), 1);
assertTrue(threadName.iterator().next().startsWith(SecondCustomExecutorProvider.NAME_FORMAT));
tearDown(); // stopping test container
client.close();
setUp(); // re-starting test container to ensure proper post-test tearDown.
}
@Test
public void testRegisteredExecutorServiceDoesNotTakesPrecedenceOverClientBuilder() throws Exception {
serverExecutorProvider.reset();
CountDownLatch nameLatch = new CountDownLatch(1);
Set<String> threadName = new HashSet<>(1);
final CustomExecutorProvider executorProvider = new CustomExecutorProvider();
Client client = ClientBuilder.newBuilder().register(executorProvider).register(new ClientRequestFilter() {
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
threadName.add(Thread.currentThread().getName());
nameLatch.countDown();
}
}).executorService(new SecondCustomExecutorProvider().getExecutorService()).build();
client.target(getBaseUri()).path("resource").request().async().get(String.class).get();
assertTrue(nameLatch.await(10, TimeUnit.SECONDS));
assertEquals(threadName.size(), 1);
assertTrue(threadName.iterator().next().startsWith(SecondCustomExecutorProvider.NAME_FORMAT));
tearDown(); // stopping test container
client.close();
setUp(); // re-starting test container to ensure proper post-test tearDown.
}
}