InternalHttpAsyncExecRuntimeQueueCapTest.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.EndpointInfo;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
public class InternalHttpAsyncExecRuntimeQueueCapTest {
@Test
void testFailFastWhenQueueFull() throws Exception {
final FakeEndpoint endpoint = new FakeEndpoint();
final FakeManager manager = new FakeManager(endpoint);
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
2,
new AtomicInteger()
);
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime result) {
}
@Override
public void failed(final Exception ex) {
fail(ex);
}
@Override
public void cancelled() {
fail("cancelled");
}
});
final CountDownLatch rejected = new CountDownLatch(1);
final LatchingHandler h1 = new LatchingHandler();
final LatchingHandler h2 = new LatchingHandler();
runtime.execute("r1", h1, ctx);
runtime.execute("r2", h2, ctx);
final LatchingHandler h3 = new LatchingHandler() {
@Override
public void failed(final Exception cause) {
super.failed(cause);
rejected.countDown();
}
};
runtime.execute("r3", h3, ctx);
assertTrue(rejected.await(2, TimeUnit.SECONDS), "r3 should be failed fast");
assertTrue(h3.failedException.get() instanceof RejectedExecutionException);
assertNull(h1.failedException.get());
assertNull(h2.failedException.get());
}
@Test
void testSlotReleasedOnTerminalSignalAllowsNext() throws Exception {
final FakeEndpoint endpoint = new FakeEndpoint();
final FakeManager manager = new FakeManager(endpoint);
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
1,
new AtomicInteger()
);
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime result) {
}
@Override
public void failed(final Exception ex) {
fail(ex);
}
@Override
public void cancelled() {
fail("cancelled");
}
});
final LatchingHandler h1 = new LatchingHandler();
runtime.execute("r1", h1, ctx);
final LatchingHandler h2 = new LatchingHandler();
runtime.execute("r2", h2, ctx);
assertTrue(h2.awaitFailed(2, TimeUnit.SECONDS));
assertTrue(h2.failedException.get() instanceof RejectedExecutionException);
// free the slot via releaseResources(), not failed()
endpoint.completeOne();
final LatchingHandler h3 = new LatchingHandler();
runtime.execute("r3", h3, ctx);
Thread.sleep(150);
assertNull(h3.failedException.get(), "r3 should not be rejected after slot released");
h3.cancel();
}
private static final class NoopInitiator implements ConnectionInitiator {
@Override
public Future<IOSession> connect(final NamedEndpoint endpoint,
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final Timeout timeout,
final Object attachment,
final FutureCallback<IOSession> callback) {
final CompletableFuture<IOSession> cf = new CompletableFuture<>();
final UnsupportedOperationException ex = new UnsupportedOperationException("not used");
cf.completeExceptionally(ex);
if (callback != null) {
callback.failed(ex);
}
return cf;
}
}
private static final class NoopPushFactory implements HandlerFactory<AsyncPushConsumer> {
@Override
public AsyncPushConsumer create(final HttpRequest request, final HttpContext context) {
return null;
}
}
private static final class FakeManager implements AsyncClientConnectionManager {
private final AsyncConnectionEndpoint endpoint;
FakeManager(final AsyncConnectionEndpoint endpoint) {
this.endpoint = endpoint;
}
@Override
public Future<AsyncConnectionEndpoint> lease(final String id,
final HttpRoute route,
final Object state,
final Timeout requestTimeout,
final FutureCallback<AsyncConnectionEndpoint> callback) {
final CompletableFuture<AsyncConnectionEndpoint> cf = CompletableFuture.completedFuture(endpoint);
if (callback != null) {
callback.completed(endpoint);
}
return cf;
}
@Override
public Future<AsyncConnectionEndpoint> connect(final AsyncConnectionEndpoint endpoint,
final ConnectionInitiator connectionInitiator,
final Timeout connectTimeout,
final Object attachment,
final HttpContext context,
final FutureCallback<AsyncConnectionEndpoint> callback) {
((FakeEndpoint) this.endpoint).connected = true;
final CompletableFuture<AsyncConnectionEndpoint> cf = CompletableFuture.completedFuture(endpoint);
if (callback != null) {
callback.completed(endpoint);
}
return cf;
}
@Override
public void upgrade(final AsyncConnectionEndpoint endpoint,
final Object attachment,
final HttpContext context) {
}
@Override
public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
}
@Override
public void close(final CloseMode closeMode) {
}
@Override
public void close() {
}
}
private static final class FakeEndpoint extends AsyncConnectionEndpoint {
volatile boolean connected = true;
private final ConcurrentLinkedQueue<AsyncClientExchangeHandler> inFlight = new ConcurrentLinkedQueue<>();
@Override
public void execute(final String id,
final AsyncClientExchangeHandler handler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
// keep the guarded handler so tests can signal terminal events
inFlight.add(handler);
}
// helpers for tests
void failOne(final Exception ex) {
final AsyncClientExchangeHandler h = inFlight.poll();
if (h != null) {
h.failed(ex);
}
}
void cancelOne() {
final AsyncClientExchangeHandler h = inFlight.poll();
if (h != null) {
h.cancel();
}
}
void completeOne() {
final AsyncClientExchangeHandler h = inFlight.poll();
if (h != null) {
h.releaseResources();
}
}
@Override
public boolean isConnected() {
return connected;
}
@Override
public void setSocketTimeout(final Timeout timeout) {
}
@Override
public void close(final CloseMode closeMode) {
connected = false;
}
@Override
public EndpointInfo getInfo() {
return null;
}
}
private static class LatchingHandler implements AsyncClientExchangeHandler {
final AtomicReference<Exception> failedException = new AtomicReference<>();
final CountDownLatch failLatch = new CountDownLatch(1);
boolean awaitFailed(final long t, final TimeUnit u) throws InterruptedException {
return failLatch.await(t, u);
}
@Override
public void produceRequest(final RequestChannel channel, final HttpContext context) {
}
@Override
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) {
}
@Override
public void consumeInformation(final HttpResponse response, final HttpContext context) {
}
@Override
public void cancel() {
}
@Override
public int available() {
return 0;
}
@Override
public void produce(final DataStreamChannel channel) {
}
@Override
public void updateCapacity(final CapacityChannel capacityChannel) {
}
@Override
public void consume(final ByteBuffer src) {
}
@Override
public void streamEnd(final List<? extends Header> trailers) {
}
@Override
public void releaseResources() {
}
@Override
public void failed(final Exception cause) {
failedException.compareAndSet(null, Objects.requireNonNull(cause));
failLatch.countDown();
}
}
@Test
void testRecursiveReentryCausesSOEWithoutCap() {
final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
final FakeManager manager = new FakeManager(endpoint);
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
-1,
null // no cap, no counter
);
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime result) {
}
@Override
public void failed(final Exception ex) {
fail(ex);
}
@Override
public void cancelled() {
fail("cancelled");
}
});
final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
assertThrows(StackOverflowError.class, () -> {
runtime.execute("loop", loop, ctx); // execute -> endpoint.execute -> failed() -> execute -> ...
});
}
@Test
void testCapBreaksRecursiveReentry() throws Exception {
final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
final FakeManager manager = new FakeManager(endpoint);
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
1,
new AtomicInteger()
);
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime result) {
}
@Override
public void failed(final Exception ex) {
fail(ex);
}
@Override
public void cancelled() {
fail("cancelled");
}
});
final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
// Should NOT blow the stack; the re-entrant call should be rejected.
runtime.execute("loop", loop, ctx);
// allow the immediate fail+re-submit path to run
Thread.sleep(50);
assertTrue(loop.lastException.get() instanceof RejectedExecutionException,
"Expected rejection to break the recursion");
}
/**
* Endpoint that synchronously fails any handler passed to execute().
*/
private static final class ImmediateFailEndpoint extends AsyncConnectionEndpoint {
volatile boolean connected = true;
@Override
public void execute(final String id,
final AsyncClientExchangeHandler handler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
handler.failed(new IOException("immediate failure"));
}
@Override
public boolean isConnected() {
return connected;
}
@Override
public void setSocketTimeout(final Timeout timeout) {
}
@Override
public void close(final CloseMode closeMode) {
connected = false;
}
@Override
public EndpointInfo getInfo() {
return null;
}
}
private static final class ReentrantHandler implements AsyncClientExchangeHandler {
private final InternalHttpAsyncExecRuntime runtime;
private final HttpClientContext ctx;
final AtomicReference<Exception> lastException = new AtomicReference<>();
ReentrantHandler(final InternalHttpAsyncExecRuntime runtime, final HttpClientContext ctx) {
this.runtime = runtime;
this.ctx = ctx;
}
@Override
public void failed(final Exception cause) {
lastException.set(cause);
// Re-enter only if this was NOT the cap rejecting us
if (!(cause instanceof RejectedExecutionException)) {
runtime.execute("loop/reenter", this, ctx);
}
}
@Override
public void produceRequest(final RequestChannel channel, final HttpContext context) {
}
@Override
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) {
}
@Override
public void consumeInformation(final HttpResponse response, final HttpContext context) {
}
@Override
public void cancel() {
}
@Override
public int available() {
return 0;
}
@Override
public void produce(final DataStreamChannel channel) {
}
@Override
public void updateCapacity(final CapacityChannel capacityChannel) {
}
@Override
public void consume(final ByteBuffer src) {
}
@Override
public void streamEnd(final List<? extends Header> trailers) {
}
@Override
public void releaseResources() {
}
}
}