RouteSegmentedConnPool.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.pool;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
/**
* Lock-free, route-segmented connection pool with tiny, conditional round-robin assistance.
*
* <p>Per-route state is kept in independent segments. Disposal of connections is offloaded
* to a bounded executor so slow closes do not block threads leasing on other routes.
* A minimal round-robin drainer is engaged only when there are many pending routes and
* there is global headroom; it never scans all routes.</p>
*
* @param <R> route key type
* @param <C> connection type (must be {@link ModalCloseable})
* @see ManagedConnPool
* @see PoolReusePolicy
* @see DisposalCallback
* @since 5.4
*/
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
@Experimental
public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implements ManagedConnPool<R, C> {
// Tiny RR assist: only engage when there are many distinct routes waiting and there is headroom.
private static final int RR_MIN_PENDING_ROUTES = 12;
private static final int RR_BUDGET = 64;
private final PoolReusePolicy reusePolicy;
private final TimeValue timeToLive;
private final DisposalCallback<C> disposal;
private final AtomicInteger defaultMaxPerRoute = new AtomicInteger(5);
private final ConcurrentHashMap<R, Segment> segments = new ConcurrentHashMap<>();
private final ConcurrentHashMap<R, Integer> maxPerRoute = new ConcurrentHashMap<>();
private final AtomicInteger totalAllocated = new AtomicInteger(0);
private final AtomicInteger maxTotal = new AtomicInteger(25);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ScheduledExecutorService timeouts;
/**
* Dedicated executor for asynchronous, best-effort disposal.
* Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread.
*/
private final ThreadPoolExecutor disposer;
// Minimal fair round-robin over routes with waiters (no global scans).
private final ConcurrentLinkedQueue<R> pendingQueue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean draining = new AtomicBoolean(false);
private final AtomicInteger pendingRouteCount = new AtomicInteger(0);
public RouteSegmentedConnPool(
final int defaultMaxPerRoute,
final int maxTotal,
final TimeValue timeToLive,
final PoolReusePolicy reusePolicy,
final DisposalCallback<C> disposal) {
this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5);
this.maxTotal.set(maxTotal > 0 ? maxTotal : 25);
this.timeToLive = timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECOND;
this.reusePolicy = reusePolicy != null ? reusePolicy : PoolReusePolicy.LIFO;
this.disposal = Args.notNull(disposal, "disposal");
final ThreadFactory tf = r -> {
final Thread t = new Thread(r, "seg-pool-timeouts");
t.setDaemon(true);
return t;
};
this.timeouts = Executors.newSingleThreadScheduledExecutor(tf);
// Asynchronous disposer for slow GRACEFUL closes.
final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
final int nThreads = Math.min(8, Math.max(2, cores)); // allow up to 8 on bigger boxes
final int qsize = 1024;
final ThreadFactory df = r -> {
final Thread t = new Thread(r, "seg-pool-disposer");
t.setDaemon(true);
return t;
};
this.disposer = new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(qsize),
df,
new ThreadPoolExecutor.AbortPolicy()); // but we preflight capacity to avoid exception storms
}
final class Segment {
final ConcurrentLinkedDeque<PoolEntry<R, C>> available = new ConcurrentLinkedDeque<>();
final ConcurrentLinkedDeque<Waiter> waiters = new ConcurrentLinkedDeque<>();
final AtomicInteger allocated = new AtomicInteger(0);
final AtomicBoolean enqueued = new AtomicBoolean(false);
int limitPerRoute(final R route) {
final Integer v = maxPerRoute.get(route);
return v != null ? v : defaultMaxPerRoute.get();
}
}
final class Waiter extends CompletableFuture<PoolEntry<R, C>> {
final R route;
final Timeout requestTimeout;
final Object state;
volatile boolean cancelled;
volatile ScheduledFuture<?> timeoutTask;
Waiter(final R route, final Timeout t, final Object s) {
this.route = route;
this.requestTimeout = t != null ? t : Timeout.DISABLED;
this.state = s;
this.cancelled = false;
this.timeoutTask = null;
}
}
@Override
public Future<PoolEntry<R, C>> lease(
final R route,
final Object state,
final Timeout requestTimeout,
final FutureCallback<PoolEntry<R, C>> callback) {
ensureOpen();
final Segment seg = segments.computeIfAbsent(route, r -> new Segment());
// 1) Try available
PoolEntry<R, C> hit;
for (; ; ) {
hit = pollAvailable(seg, state);
if (hit == null) {
break;
}
final long now = System.currentTimeMillis();
if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit)) {
discardAndDecr(hit, CloseMode.GRACEFUL);
continue;
}
break;
}
if (hit != null) {
if (callback != null) {
callback.completed(hit);
}
return CompletableFuture.completedFuture(hit);
}
// 2) Try to allocate new within caps
if (tryAllocateOne(route, seg)) {
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
if (callback != null) {
callback.completed(entry);
}
return CompletableFuture.completedFuture(entry);
}
// 3) Enqueue waiter with timeout
final Waiter w = new Waiter(route, requestTimeout, state);
seg.waiters.addLast(w);
enqueueIfNeeded(route, seg);
// Late hit after enqueuing
final PoolEntry<R, C> late = pollAvailable(seg, state);
if (late != null) {
if (seg.waiters.remove(w)) {
cancelTimeout(w);
if (callback != null) {
callback.completed(late);
}
w.complete(late);
dequeueIfDrained(seg);
return w;
} else {
boolean handedOff = false;
for (Waiter other; (other = seg.waiters.pollFirst()) != null; ) {
if (!other.cancelled && compatible(other.state, late.getState())) {
cancelTimeout(other);
handedOff = other.complete(late);
if (handedOff) {
break;
}
}
}
if (!handedOff) {
offerAvailable(seg, late);
}
}
}
scheduleTimeout(w, seg);
if (callback != null) {
w.whenComplete((pe, ex) -> {
if (ex != null) {
callback.failed(ex instanceof Exception ? (Exception) ex : new Exception(ex));
} else {
callback.completed(pe);
}
});
}
triggerDrainIfMany();
return w;
}
@Override
public void release(final PoolEntry<R, C> entry, final boolean reusable) {
if (entry == null) {
return;
}
final R route = entry.getRoute();
final Segment seg = segments.get(route);
if (seg == null) {
// Segment got removed; dispose off-thread and bail.
discardEntry(entry, CloseMode.GRACEFUL);
return;
}
final long now = System.currentTimeMillis();
final boolean stillValid = reusable && !isPastTtl(entry) && !entry.getExpiryDeadline().isBefore(now);
if (stillValid) {
if (!handOffToCompatibleWaiter(entry, seg)) {
offerAvailable(seg, entry);
enqueueIfNeeded(route, seg);
triggerDrainIfMany();
}
} else {
discardAndDecr(entry, CloseMode.GRACEFUL);
}
maybeCleanupSegment(route, seg);
}
@Override
public void close() throws IOException {
close(CloseMode.GRACEFUL);
}
@Override
public void close(final CloseMode closeMode) {
if (!closed.compareAndSet(false, true)) {
return;
}
timeouts.shutdownNow();
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
final Segment seg = e.getValue();
for (final Waiter w : seg.waiters) {
w.cancelled = true;
cancelTimeout(w);
w.completeExceptionally(new TimeoutException("Pool closed"));
}
seg.waiters.clear();
if (seg.enqueued.getAndSet(false)) {
pendingRouteCount.decrementAndGet();
}
// discard available
for (final PoolEntry<R, C> p : seg.available) {
discardEntry(p, closeMode);
}
seg.available.clear();
final int alloc = seg.allocated.getAndSet(0);
if (alloc != 0) {
totalAllocated.addAndGet(-alloc);
}
}
segments.clear();
pendingQueue.clear();
pendingRouteCount.set(0);
// Let in-flight graceful closes progress; no blocking here.
disposer.shutdown();
}
@Override
public void closeIdle(final TimeValue idleTime) {
final long cutoff = System.currentTimeMillis()
- Math.max(0L, idleTime != null ? idleTime.toMilliseconds() : 0L);
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
final R route = e.getKey();
final Segment seg = e.getValue();
int processed = 0;
final int cap = 64;
for (final Iterator<PoolEntry<R, C>> it = seg.available.iterator(); it.hasNext(); ) {
final PoolEntry<R, C> p = it.next();
if (p.getUpdated() <= cutoff) {
it.remove();
discardAndDecr(p, CloseMode.GRACEFUL);
if (++processed == cap) {
break;
}
}
}
maybeCleanupSegment(route, seg);
}
}
@Override
public void closeExpired() {
final long now = System.currentTimeMillis();
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
final R route = e.getKey();
final Segment seg = e.getValue();
int processed = 0;
final int cap = 64;
for (final Iterator<PoolEntry<R, C>> it = seg.available.iterator(); it.hasNext(); ) {
final PoolEntry<R, C> p = it.next();
if (p.getExpiryDeadline().isBefore(now) || isPastTtl(p)) {
it.remove();
discardAndDecr(p, CloseMode.GRACEFUL);
if (++processed == cap) {
break;
}
}
}
maybeCleanupSegment(route, seg);
}
}
@Override
public Set<R> getRoutes() {
final Set<R> out = new HashSet<>();
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
final Segment s = e.getValue();
if (!s.available.isEmpty() || s.allocated.get() > 0 || !s.waiters.isEmpty()) {
out.add(e.getKey());
}
}
return out;
}
@Override
public int getMaxTotal() {
return maxTotal.get();
}
@Override
public void setMaxTotal(final int max) {
maxTotal.set(Math.max(1, max));
}
@Override
public int getDefaultMaxPerRoute() {
return defaultMaxPerRoute.get();
}
@Override
public void setDefaultMaxPerRoute(final int max) {
defaultMaxPerRoute.set(Math.max(1, max));
}
@Override
public int getMaxPerRoute(final R route) {
final Integer v = maxPerRoute.get(route);
return v != null ? v : defaultMaxPerRoute.get();
}
@Override
public void setMaxPerRoute(final R route, final int max) {
if (max <= 0) {
maxPerRoute.remove(route);
} else {
maxPerRoute.put(route, max);
}
}
@Override
public PoolStats getTotalStats() {
int leased = 0, availableCount = 0, pending = 0;
for (final Segment seg : segments.values()) {
final int alloc = seg.allocated.get();
final int avail = seg.available.size();
leased += Math.max(0, alloc - avail);
availableCount += avail;
pending += seg.waiters.size();
}
return new PoolStats(leased, pending, availableCount, getMaxTotal());
}
@Override
public PoolStats getStats(final R route) {
final Segment seg = segments.get(route);
if (seg == null) {
return new PoolStats(0, 0, 0, getMaxPerRoute(route));
}
final int alloc = seg.allocated.get();
final int avail = seg.available.size();
final int leased = Math.max(0, alloc - avail);
final int pending = seg.waiters.size();
return new PoolStats(leased, pending, avail, getMaxPerRoute(route));
}
private void ensureOpen() {
if (closed.get()) {
throw new IllegalStateException("Pool is closed");
}
}
private boolean isPastTtl(final PoolEntry<R, C> p) {
if (timeToLive == null || timeToLive.getDuration() < 0) {
return false;
}
return System.currentTimeMillis() - p.getCreated() >= timeToLive.toMilliseconds();
}
private void scheduleTimeout(final Waiter w, final Segment seg) {
if (!TimeValue.isPositive(w.requestTimeout)) {
return;
}
w.timeoutTask = timeouts.schedule(() -> {
if (w.isDone()) {
return;
}
w.cancelled = true;
seg.waiters.remove(w);
w.completeExceptionally(new TimeoutException("Lease timed out"));
dequeueIfDrained(seg);
maybeCleanupSegment(w.route, seg);
final PoolEntry<R, C> p = pollAvailable(seg, w.state);
if (p != null) {
// Try to hand off that available entry to some other compatible waiter.
if (!handOffToCompatibleWaiter(p, seg)) {
offerAvailable(seg, p);
}
}
}, w.requestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
private void cancelTimeout(final Waiter w) {
final ScheduledFuture<?> t = w.timeoutTask;
if (t != null) {
t.cancel(false);
}
}
private void offerAvailable(final Segment seg, final PoolEntry<R, C> p) {
if (reusePolicy == PoolReusePolicy.LIFO) {
seg.available.addFirst(p);
} else {
seg.available.addLast(p);
}
}
private PoolEntry<R, C> pollAvailable(final Segment seg, final Object neededState) {
if (neededState == null) {
return seg.available.pollFirst();
}
for (final Iterator<PoolEntry<R, C>> it = seg.available.iterator(); it.hasNext(); ) {
final PoolEntry<R, C> p = it.next();
if (compatible(neededState, p.getState())) {
it.remove();
return p;
}
}
return null;
}
private boolean compatible(final Object needed, final Object have) {
return needed == null || Objects.equals(needed, have);
}
private boolean handOffToCompatibleWaiter(final PoolEntry<R, C> entry, final Segment seg) {
final Deque<Waiter> skipped = new ArrayDeque<>();
boolean handedOff = false;
for (; ; ) {
final Waiter w = seg.waiters.pollFirst();
if (w == null) {
break;
}
if (w.cancelled || w.isDone()) {
continue;
}
if (compatible(w.state, entry.getState())) {
cancelTimeout(w);
handedOff = w.complete(entry);
if (handedOff) {
dequeueIfDrained(seg);
break;
}
} else {
skipped.addLast(w);
}
}
// Restore non-compatible waiters to the head to preserve ordering.
while (!skipped.isEmpty()) {
seg.waiters.addFirst(skipped.pollLast());
}
return handedOff;
}
private void discardAndDecr(final PoolEntry<R, C> p, final CloseMode mode) {
totalAllocated.decrementAndGet();
final Segment seg = segments.get(p.getRoute());
if (seg != null) {
seg.allocated.decrementAndGet();
}
discardEntry(p, mode);
}
private CloseMode orImmediate(final CloseMode m) {
return m != null ? m : CloseMode.IMMEDIATE;
}
private void maybeCleanupSegment(final R route, final Segment seg) {
if (seg.allocated.get() == 0 && seg.available.isEmpty() && seg.waiters.isEmpty()) {
segments.remove(route, seg);
if (seg.enqueued.getAndSet(false)) {
pendingRouteCount.decrementAndGet();
}
}
}
private boolean tryAllocateOne(final R route, final Segment seg) {
for (; ; ) {
final int tot = totalAllocated.get();
if (tot >= maxTotal.get()) {
return false;
}
if (!totalAllocated.compareAndSet(tot, tot + 1)) {
continue;
}
for (; ; ) {
final int per = seg.allocated.get();
if (per >= seg.limitPerRoute(route)) {
totalAllocated.decrementAndGet();
return false;
}
if (seg.allocated.compareAndSet(per, per + 1)) {
return true;
}
}
}
}
private void enqueueIfNeeded(final R route, final Segment seg) {
if (seg.enqueued.compareAndSet(false, true)) {
pendingQueue.offer(route);
pendingRouteCount.incrementAndGet();
}
}
private void dequeueIfDrained(final Segment seg) {
if (seg.waiters.isEmpty() && seg.enqueued.getAndSet(false)) {
pendingRouteCount.decrementAndGet();
}
}
private void triggerDrainIfMany() {
// Engage RR only if there is global headroom and many distinct routes pending
if (pendingRouteCount.get() < RR_MIN_PENDING_ROUTES) {
return;
}
if (totalAllocated.get() >= maxTotal.get()) {
return;
}
if (!draining.compareAndSet(false, true)) {
return;
}
disposer.execute(() -> {
try {
serveRoundRobin(RR_BUDGET);
} finally {
draining.set(false);
if (pendingRouteCount.get() >= RR_MIN_PENDING_ROUTES
&& totalAllocated.get() < maxTotal.get()
&& !pendingQueue.isEmpty()) {
triggerDrainIfMany();
}
}
});
}
private void serveRoundRobin(final int budget) {
int created = 0;
for (; created < budget; ) {
final R route = pendingQueue.poll();
if (route == null) {
break;
}
final Segment seg = segments.get(route);
if (seg == null) {
continue;
}
if (seg.waiters.isEmpty()) {
if (seg.enqueued.getAndSet(false)) {
pendingRouteCount.decrementAndGet();
}
continue;
}
if (!tryAllocateOne(route, seg)) {
// No headroom or hit per-route cap. Re-queue for later.
pendingQueue.offer(route);
continue;
}
final Waiter w = seg.waiters.pollFirst();
if (w == null || w.cancelled) {
seg.allocated.decrementAndGet();
totalAllocated.decrementAndGet();
} else {
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
cancelTimeout(w);
w.complete(entry);
created++;
}
if (!seg.waiters.isEmpty()) {
pendingQueue.offer(route);
} else {
if (seg.enqueued.getAndSet(false)) {
pendingRouteCount.decrementAndGet();
}
}
}
}
/**
* Dispose a pool entry's connection asynchronously if possible; under pressure fall back to IMMEDIATE on caller.
*/
private void discardEntry(final PoolEntry<R, C> p, final CloseMode preferred) {
final CloseMode mode = orImmediate(preferred);
// Pre-flight capacity to avoid exception storms under saturation
if (disposer.isShutdown()) {
p.discardConnection(CloseMode.IMMEDIATE);
return;
}
final LinkedBlockingQueue<Runnable> q = (LinkedBlockingQueue<Runnable>) disposer.getQueue();
if (q.remainingCapacity() == 0) {
p.discardConnection(CloseMode.IMMEDIATE);
return;
}
try {
disposer.execute(() -> {
try {
p.discardConnection(mode);
} catch (final RuntimeException ignore) {
// best-effort
}
});
} catch (final RejectedExecutionException saturated) {
// Saturated or shutting down: never block caller
p.discardConnection(CloseMode.IMMEDIATE);
}
}
}