TestPoolingHttpClientConnectionManagerOffLockDisposal.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.impl.io;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLSession;

import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.io.ConnectionEndpoint;
import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.io.HttpConnectionFactory;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Test;

class TestPoolingHttpClientConnectionManagerOffLockDisposal {

    // Simulates slow close only for GRACEFUL
    static final class SleeperConnection implements ManagedHttpClientConnection {
        private volatile boolean open = true;
        private volatile Timeout soTimeout = Timeout.DISABLED;
        private final long sleepMillis;

        SleeperConnection(final long sleepMillis) {
            this.sleepMillis = sleepMillis;
        }

        @Override
        public void bind(final Socket socket) {
        }

        @Override
        public void close(final CloseMode closeMode) {
            try {
                if (closeMode == CloseMode.GRACEFUL) {
                    Thread.sleep(sleepMillis);
                }
            } catch (final InterruptedException ignore) {
                Thread.currentThread().interrupt();
            } finally {
                open = false;
            }
        }

        @Override
        public void close() {
            close(CloseMode.GRACEFUL);
        }

        @Override
        public EndpointDetails getEndpointDetails() {
            return null;
        }

        @Override
        public SocketAddress getLocalAddress() {
            return null;
        }

        @Override
        public SocketAddress getRemoteAddress() {
            return null;
        }

        @Override
        public Socket getSocket() {
            return null;
        }

        @Override
        public SSLSession getSSLSession() {
            return null;
        }

        @Override
        public void passivate() {
        }

        @Override
        public void activate() {
        }

        @Override
        public boolean isOpen() {
            return open;
        }

        @Override
        public boolean isConsistent() {
            return true;
        }

        @Override
        public boolean isDataAvailable(final Timeout timeout) {
            return false;
        }

        @Override
        public boolean isStale() {
            return false;
        }

        @Override
        public void setSocketTimeout(final Timeout timeout) {
            this.soTimeout = timeout;
        }

        @Override
        public Timeout getSocketTimeout() {
            return soTimeout;
        }

        @Override
        public void sendRequestHeader(final ClassicHttpRequest request) {
        }

        @Override
        public void sendRequestEntity(final ClassicHttpRequest request) {
        }

        @Override
        public void flush() {
        }

        @Override
        public ClassicHttpResponse receiveResponseHeader() {
            return null;
        }

        @Override
        public void receiveResponseEntity(final ClassicHttpResponse response) {
        }

        @Override
        public void terminateRequest(final ClassicHttpRequest request) {
        }

        @Override
        public ProtocolVersion getProtocolVersion() {
            return null;
        }
    }

    private static HttpConnectionFactory<ManagedHttpClientConnection> sleeperFactory(final long ms) {
        return socket -> new SleeperConnection(ms);
    }

    private static ConnectionEndpoint lease(final PoolingHttpClientConnectionManager mgr,
                                            final String id, final HttpRoute route, final Object state,
                                            final long sec) throws Exception {
        return mgr.lease(id, route, Timeout.ofSeconds((int) sec), state).get(Timeout.ofSeconds((int) sec));
    }

    // Measure only the lease latency; release happens outside this window
    private static long leaseAndMeasure(final PoolingHttpClientConnectionManager mgr,
                                        final String id, final HttpRoute route, final Object state,
                                        final long sec) throws Exception {
        final long start = System.nanoTime();
        final ConnectionEndpoint ep = lease(mgr, id, route, state, sec);
        final long elapsed = (System.nanoTime() - start) / 1_000_000L;
        mgr.release(ep, state, TimeValue.ofSeconds(30)); // keep-alive, goes back to AVAILABLE
        return elapsed;
    }

    private static PoolingHttpClientConnectionManager newMgrStrict(final long sleeperMs) {
        return PoolingHttpClientConnectionManagerBuilder.create()
                .setOffLockDisposalEnabled(true)
                .setConnPoolPolicy(PoolReusePolicy.LIFO)
                .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT)
                .setConnectionFactory(sleeperFactory(sleeperMs))
                .build();
    }

    private static PoolingHttpClientConnectionManager newMgrLax(final long sleeperMs) {
        return PoolingHttpClientConnectionManagerBuilder.create()
                .setOffLockDisposalEnabled(true)
                .setConnPoolPolicy(PoolReusePolicy.LIFO)
                .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX)
                .setConnectionFactory(sleeperFactory(sleeperMs))
                .build();
    }

    @Test
    void strictEviction_offLock_otherThreadLeasesFast() throws Exception {
        final PoolingHttpClientConnectionManager mgr = newMgrStrict(1200);

        final HttpRoute rA = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "a.example", 80));
        final HttpRoute rB = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "b.example", 80));
        final HttpRoute rC = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "c.example", 80));

        mgr.setMaxTotal(2);
        mgr.setMaxPerRoute(rA, 1);
        mgr.setMaxPerRoute(rB, 1);
        mgr.setMaxPerRoute(rC, 1);

        final ConnectionEndpoint epA0 = lease(mgr, "seedA", rA, null, 2);
        mgr.release(epA0, null, TimeValue.ofSeconds(30));
        final ConnectionEndpoint epB0 = lease(mgr, "seedB", rB, null, 2);
        mgr.release(epB0, null, TimeValue.ofSeconds(30));

        final ExecutorService es = Executors.newFixedThreadPool(2);
        try {
            final Callable<Long> t1Lease = () -> {
                final long start = System.nanoTime();
                final ConnectionEndpoint epC = lease(mgr, "t1", rC, null, 3);
                mgr.release(epC, null, TimeValue.ofSeconds(5));
                return (System.nanoTime() - start) / 1_000_000L;
            };

            final Callable<Long> t2Lease = () -> {
                // small stagger so we overlap with t1���s drain window
                Thread.sleep(200);
                return leaseAndMeasure(mgr, "t2", rA, null, 2);
            };

            final long t1LeaseMs = es.submit(t1Lease).get(6, TimeUnit.SECONDS);
            final long t2LeaseMs = es.submit(t2Lease).get(6, TimeUnit.SECONDS);

            assertTrue(t1LeaseMs >= 900L, "T1 lease should include slow drain: " + t1LeaseMs + "ms");

            assertTrue(t2LeaseMs < 1900L, "T2 lease should complete without timing out: " + t2LeaseMs + "ms");
        } finally {
            es.shutdownNow();
            mgr.close(CloseMode.IMMEDIATE);
        }
    }


    @Test
    void leaseNotBlocked_LAX_stateMismatchDiscard_offLockDisposal() throws Exception {
        final PoolingHttpClientConnectionManager mgr = newMgrLax(1200);

        final HttpRoute route = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "lax.example", 80));
        mgr.setMaxTotal(2);
        mgr.setMaxPerRoute(route, 2);

        final ConnectionEndpoint epA = lease(mgr, "tA", route, "A", 2);
        mgr.release(epA, "A", TimeValue.ofSeconds(30));

        final ExecutorService es = Executors.newFixedThreadPool(2);
        try {
            final Callable<Long> t1Lease = () -> {
                final long start = System.nanoTime();
                final ConnectionEndpoint epB = lease(mgr, "tB", route, "B", 3); // state mismatch discard
                // drainDisposals() runs in finally inside LeaseRequest#get ��� this lease measures the drain
                mgr.release(epB, "B", TimeValue.ofSeconds(5));
                return (System.nanoTime() - start) / 1_000_000L;
            };

            // T2: concurrent lease "B" should be fast
            final Callable<Long> t2Lease = () -> {
                Thread.sleep(50);
                return leaseAndMeasure(mgr, "t2", route, "B", 2);
            };

            final long t1LeaseMs = es.submit(t1Lease).get(6, TimeUnit.SECONDS);
            final long t2LeaseMs = es.submit(t2Lease).get(6, TimeUnit.SECONDS);

            // With drain in LeaseRequest#get, T1 lease is slow; T2 remains fast.
            assertTrue(t1LeaseMs >= 1000L, "T1 lease should include slow drain: " + t1LeaseMs + "ms");
            assertTrue(t2LeaseMs < 300L, "Lease blocked by in-thread discard: " + t2LeaseMs + "ms");

        } finally {
            es.shutdownNow();
            mgr.close(CloseMode.IMMEDIATE);
        }
    }
}