RouteSegmentedConnPoolTest.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.pool;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Test;
public class RouteSegmentedConnPoolTest {
private static <R, C extends ModalCloseable> RouteSegmentedConnPool<R, C> newPool(
final int defPerRoute, final int maxTotal, final TimeValue ttl, final PoolReusePolicy reuse,
final DisposalCallback<C> disposal) {
return new RouteSegmentedConnPool<>(defPerRoute, maxTotal, ttl, reuse, disposal);
}
@Test
void basicLeaseReleaseAndHandoff() throws Exception {
final DisposalCallback<FakeConnection> disposal = FakeConnection::close;
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
final PoolEntry<String, FakeConnection> e1 = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
assertNotNull(e1);
assertEquals("r1", e1.getRoute());
assertFalse(e1.hasConnection());
e1.assignConnection(new FakeConnection());
e1.updateState("A");
e1.updateExpiry(TimeValue.ofSeconds(30));
pool.release(e1, true);
final Future<PoolEntry<String, FakeConnection>> f2 =
pool.lease("r1", "A", Timeout.ofSeconds(1), null);
final PoolEntry<String, FakeConnection> e2 = f2.get(1, TimeUnit.SECONDS);
assertSame(e1, e2, "Should receive same entry via direct hand-off");
pool.release(e2, true);
pool.close(CloseMode.IMMEDIATE);
}
@Test
void perRouteAndTotalLimits() throws Exception {
final DisposalCallback<FakeConnection> disposal = FakeConnection::close;
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
final PoolEntry<String, FakeConnection> r1a = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final PoolEntry<String, FakeConnection> r2a = pool.lease("r2", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final Future<PoolEntry<String, FakeConnection>> blocked = pool.lease("r1", null, Timeout.ofMilliseconds(150), null);
final ExecutionException ex = assertThrows(
ExecutionException.class,
() -> blocked.get(400, TimeUnit.MILLISECONDS));
assertInstanceOf(TimeoutException.class, ex.getCause());
assertEquals("Lease timed out", ex.getCause().getMessage());
r1a.assignConnection(new FakeConnection());
r1a.updateExpiry(TimeValue.ofSeconds(5));
pool.release(r1a, true);
final PoolEntry<String, FakeConnection> r1b =
pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
assertNotNull(r1b);
pool.release(r2a, false); // drop
pool.release(r1b, false);
pool.close(CloseMode.IMMEDIATE);
}
@Test
void stateCompatibilityNullMatchesAnything() throws Exception {
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close);
final PoolEntry<String, FakeConnection> e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
e.assignConnection(new FakeConnection());
e.updateState("X");
e.updateExpiry(TimeValue.ofSeconds(30));
pool.release(e, true);
// waiter with null state must match
final PoolEntry<String, FakeConnection> got =
pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
assertSame(e, got);
pool.release(got, false);
pool.close(CloseMode.IMMEDIATE);
}
@Test
void closeIdleRemovesStaleAvailable() throws Exception {
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close);
final PoolEntry<String, FakeConnection> e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
e.assignConnection(new FakeConnection());
e.updateExpiry(TimeValue.ofSeconds(30));
pool.release(e, true);
// sleep to make it idle
Thread.sleep(120);
pool.closeIdle(TimeValue.ofMilliseconds(50));
final PoolStats stats = pool.getStats("r");
assertEquals(0, stats.getAvailable());
pool.close(CloseMode.IMMEDIATE);
}
@Test
void closeExpiredHonorsEntryExpiryOrTtl() throws Exception {
// TTL = 100ms, so entries become past-ttl quickly
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 1, TimeValue.ofMilliseconds(100), PoolReusePolicy.LIFO, FakeConnection::close);
final PoolEntry<String, FakeConnection> e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
e.assignConnection(new FakeConnection());
// keep alive "far", TTL will still kill it
e.updateExpiry(TimeValue.ofSeconds(10));
pool.release(e, true);
Thread.sleep(150);
pool.closeExpired();
final PoolStats stats = pool.getStats("r");
assertEquals(0, stats.getAvailable(), "Expired/TTL entry should be gone");
pool.close(CloseMode.IMMEDIATE);
}
@Test
void waiterTimesOutAndIsFailed() throws Exception {
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close);
// Occupy single slot and don't release
final PoolEntry<String, FakeConnection> e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final Future<PoolEntry<String, FakeConnection>> waiter =
pool.lease("r", null, Timeout.ofMilliseconds(150), null);
final ExecutionException ex = assertThrows(
ExecutionException.class,
() -> waiter.get(500, TimeUnit.MILLISECONDS));
assertInstanceOf(TimeoutException.class, ex.getCause());
assertEquals("Lease timed out", ex.getCause().getMessage());
// cleanup
pool.release(e, false);
pool.close(CloseMode.IMMEDIATE);
}
@Test
void poolCloseCancelsWaitersAndDrainsAvailable() throws Exception {
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close);
// Consume the only slot so the next lease becomes a waiter
final Future<PoolEntry<String, FakeConnection>> first = pool.lease("r", null, Timeout.ofSeconds(5), null);
first.get(); // allocated immediately, not released
// Now this one queues as a waiter
final Future<PoolEntry<String, FakeConnection>> waiter =
pool.lease("r", null, Timeout.ofSeconds(5), null);
pool.close(CloseMode.IMMEDIATE);
final ExecutionException ex = assertThrows(ExecutionException.class, waiter::get);
assertInstanceOf(TimeoutException.class, ex.getCause());
assertEquals("Pool closed", ex.getCause().getMessage());
}
@Test
void reusePolicyLifoVsFifoIsObservable() throws Exception {
final RouteSegmentedConnPool<String, FakeConnection> lifo =
newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close);
final PoolEntry<String, FakeConnection> a = lifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final PoolEntry<String, FakeConnection> b = lifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
a.assignConnection(new FakeConnection());
a.updateExpiry(TimeValue.ofSeconds(10));
lifo.release(a, true);
b.assignConnection(new FakeConnection());
b.updateExpiry(TimeValue.ofSeconds(10));
lifo.release(b, true);
final PoolEntry<String, FakeConnection> firstLifo =
lifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
assertSame(b, firstLifo, "LIFO should return last released");
lifo.release(firstLifo, false);
lifo.close(CloseMode.IMMEDIATE);
final RouteSegmentedConnPool<String, FakeConnection> fifo =
newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.FIFO, FakeConnection::close);
final PoolEntry<String, FakeConnection> a2 = fifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final PoolEntry<String, FakeConnection> b2 = fifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
a2.assignConnection(new FakeConnection());
a2.updateExpiry(TimeValue.ofSeconds(10));
fifo.release(a2, true);
b2.assignConnection(new FakeConnection());
b2.updateExpiry(TimeValue.ofSeconds(10));
fifo.release(b2, true);
final PoolEntry<String, FakeConnection> firstFifo =
fifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
assertSame(a2, firstFifo, "FIFO should return first released");
fifo.release(firstFifo, false);
fifo.close(CloseMode.IMMEDIATE);
}
@Test
void disposalIsCalledOnDiscard() throws Exception {
final List<FakeConnection> closed = new ArrayList<>();
final CountDownLatch disposed = new CountDownLatch(1);
final DisposalCallback<FakeConnection> disposal = (c, m) -> {
try {
c.close(m);
} finally {
closed.add(c);
disposed.countDown();
}
};
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
final PoolEntry<String, FakeConnection> e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final FakeConnection conn = new FakeConnection();
e.assignConnection(conn);
pool.release(e, false);
// Wait for async disposer to run
assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time");
assertEquals(1, closed.size());
assertEquals(1, closed.get(0).closeCount());
pool.close(CloseMode.IMMEDIATE);
}
@Test
void slowDisposalDoesNotBlockOtherRoutes() throws Exception {
final CountDownLatch disposed = new CountDownLatch(1);
final AtomicLong closedAt = new AtomicLong(0L);
final DisposalCallback<FakeConnection> disposal = (c, m) -> {
try {
c.close(m); // FakeConnection sleeps closeDelayMs internally
} finally {
closedAt.set(System.nanoTime());
disposed.countDown();
}
};
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
final PoolEntry<String, FakeConnection> e1 = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
e1.assignConnection(new FakeConnection(600)); // close sleeps ~600ms
final long startDiscard = System.nanoTime();
pool.release(e1, false); // triggers async disposal
// Lease on another route must not be blocked by slow disposal
final long t0 = System.nanoTime();
final PoolEntry<String, FakeConnection> e2 = pool.lease("r2", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final long tLeaseMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
assertTrue(tLeaseMs < 200, "Other route lease blocked by disposal: " + tLeaseMs + "ms");
pool.release(e2, false);
// Wait for disposer to finish, then assert the slow path really took ~600ms
assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time");
final long discardMs = TimeUnit.NANOSECONDS.toMillis(closedAt.get() - startDiscard);
assertTrue(discardMs >= 600, "Discard should reflect slow close path (took " + discardMs + "ms)");
pool.close(CloseMode.IMMEDIATE);
}
@Test
void getRoutesCoversAllocatedAvailableAndWaiters() throws Exception {
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close);
assertTrue(pool.getRoutes().isEmpty(), "Initially there should be no routes");
// Allocate on rA
final PoolEntry<String, FakeConnection> a =
pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
assertEquals(new HashSet<String>(Collections.singletonList("rA")), pool.getRoutes(),
"rA must be listed because it is leased (allocated > 0)");
// Make rA available
a.assignConnection(new FakeConnection());
a.updateExpiry(TimeValue.ofSeconds(30));
pool.release(a, true);
assertEquals(new HashSet<>(Collections.singletonList("rA")), pool.getRoutes(),
"rA must be listed because it has AVAILABLE entries");
// Enqueue waiter on rB (will time out)
final Future<PoolEntry<String, FakeConnection>> waiterB =
pool.lease("rB", null, Timeout.ofMilliseconds(300), null);
final Set<String> routesNow = pool.getRoutes();
assertTrue(routesNow.contains("rA") && routesNow.contains("rB"),
"Both rA (available) and rB (waiter) must be listed");
// Let rB time out (do NOT free capacity before the timeout fires)
final ExecutionException ex = assertThrows(
ExecutionException.class,
() -> waiterB.get(600, TimeUnit.MILLISECONDS));
assertInstanceOf(TimeoutException.class, ex.getCause());
assertEquals("Lease timed out", ex.getCause().getMessage());
// Now drain rA by leasing and discarding to trigger segment cleanup
final PoolEntry<String, FakeConnection> a2 =
pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
pool.release(a2, false); // discard
final Set<String> afterDropA = pool.getRoutes();
assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up");
assertFalse(afterDropA.contains("rB"), "rB waiter timed out; should not remain listed");
// Final cleanup
pool.close(CloseMode.IMMEDIATE);
assertTrue(pool.getRoutes().isEmpty(), "All routes must be gone after close()");
}
}