RoutePoolsJmh.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.core5.benchmark;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
import org.apache.hc.core5.pool.DisposalCallback;
import org.apache.hc.core5.pool.LaxConnPool;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.pool.RouteSegmentedConnPool;
import org.apache.hc.core5.pool.StrictConnPool;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

/**
 * JMH harness that drives StrictConnPool, LaxConnPool, and RouteSegmentedConnPool
 * against a local HTTP/1.1 mini-cluster using real sockets and keep-alive.
 */
@BenchmarkMode({Mode.Throughput})
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@OutputTimeUnit(TimeUnit.SECONDS)
public class RoutePoolsJmh {

    // ---------------------------------------------------------------
    // Utilities
    // ---------------------------------------------------------------
    static ThreadFactory daemonFactory(final String prefix) {
        final AtomicInteger n = new AtomicInteger(1);
        return r -> {
            final Thread t = new Thread(r, prefix + "-" + n.getAndIncrement());
            t.setDaemon(true);
            return t;
        };
    }

    // ---------------------------------------------------------------
    // Real HTTP/1.1 persistent connection used by the pool
    // ---------------------------------------------------------------
    public static final class RealConn implements ModalCloseable {
        private final String host;
        private final int port;
        private final int closeDelayMs;
        private final Socket socket;
        private final BufferedInputStream in;
        private final BufferedOutputStream out;

        public RealConn(
                final String host,
                final int port,
                final int closeDelayMs,
                final int soTimeoutMs,
                final int connectTimeoutMs) throws IOException {
            this.host = host;
            this.port = port;
            this.closeDelayMs = closeDelayMs;
            final Socket s = new Socket();
            s.setTcpNoDelay(true);
            s.setSoTimeout(Math.max(1000, soTimeoutMs)); // read timeout
            s.setKeepAlive(true);
            s.connect(new InetSocketAddress(host, port), Math.max(1, connectTimeoutMs));
            this.socket = s;
            this.in = new BufferedInputStream(s.getInputStream(), 32 * 1024);
            this.out = new BufferedOutputStream(s.getOutputStream(), 32 * 1024);
        }

        public void getOnce(final boolean keepAlive) throws IOException {
            final String req = "GET / HTTP/1.1\r\n" +
                    "Host: " + host + ":" + port + "\r\n" +
                    (keepAlive ? "Connection: keep-alive\r\n" : "Connection: close\r\n") +
                    "\r\n";
            out.write(req.getBytes(StandardCharsets.ISO_8859_1));
            out.flush();

            final String status = readLine();
            if (status == null) {
                throw new IOException("No status line");
            }
            final String[] parts = status.split(" ", 3);
            if (parts.length < 2 || !parts[0].startsWith("HTTP/1.")) {
                throw new IOException("Bad status: " + status);
            }
            final int code;
            try {
                code = Integer.parseInt(parts[1]);
            } catch (final NumberFormatException nfe) {
                throw new IOException("Bad status code in: " + status);
            }
            if (code != 200) {
                throw new IOException("Unexpected status: " + status);
            }

            int contentLength = -1;
            for (; ; ) {
                final String line = readLine();
                if (line == null) {
                    throw new IOException("EOF in headers");
                }
                if (line.isEmpty()) {
                    break;
                }
                final int colon = line.indexOf(':');
                if (colon > 0) {
                    final String name = line.substring(0, colon).trim();
                    if ("Content-Length".equalsIgnoreCase(name)) {
                        try {
                            contentLength = Integer.parseInt(line.substring(colon + 1).trim());
                        } catch (final NumberFormatException ignore) {
                            // ignore
                        }
                    }
                }
            }
            if (contentLength < 0) {
                throw new IOException("Missing Content-Length");
            }

            int remaining = contentLength;
            final byte[] buf = new byte[8192];
            while (remaining > 0) {
                final int r = in.read(buf, 0, Math.min(buf.length, remaining));
                if (r == -1) {
                    throw new IOException("unexpected EOF in body");
                }
                remaining -= r;
            }
        }

        private String readLine() throws IOException {
            final ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
            for (; ; ) {
                final int b = in.read();
                if (b == -1) {
                    if (baos.size() == 0) {
                        return null;
                    }
                    break;
                }
                if (b == '\n') {
                    break;
                }
                baos.write(b);
            }
            final byte[] raw = baos.toByteArray();
            final int len = raw.length;
            final int eff = (len > 0 && raw[len - 1] == '\r') ? len - 1 : len;
            return new String(raw, 0, eff, StandardCharsets.ISO_8859_1);
        }

        @Override
        public void close(final CloseMode closeMode) {
            if (closeDelayMs > 0) {
                try {
                    Thread.sleep(closeDelayMs);
                } catch (final InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            try {
                socket.close();
            } catch (final IOException ignore) {
                // ignore
            }
        }

        @Override
        public void close() throws IOException {
            if (closeDelayMs > 0) {
                try {
                    Thread.sleep(closeDelayMs);
                } catch (final InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            socket.close();
        }
    }

    // ---------------------------------------------------------------
    // Benchmark state & setup
    // ---------------------------------------------------------------
    @State(Scope.Benchmark)
    public static class BenchState {
        @Param({"OFFLOCK", "STRICT", "LAX"})
        public String policy;
        @Param({"1", "4", "10", "25", "50"})
        public int routes;
        @Param({"128"})
        public int payloadBytes;
        @Param({"100"})
        public int maxTotal;
        @Param({"5"})
        public int defMaxPerRoute;
        @Param({"true"})
        public boolean keepAlive;
        @Param({"5000"})
        public int keepAliveMs;
        @Param({"0", "20"})
        public int slowClosePct;
        @Param({"0", "200"})
        public int closeSleepMs;
        @Param({"10000"})
        public int soTimeoutMs;
        @Param({"30000"})
        public int requestTimeoutMs;
        @Param({"1000"})
        public int connectTimeoutMs;

        ManagedConnPool<String, RealConn> pool;
        DisposalCallback<RealConn> disposal;
        MiniCluster cluster;
        String[] routeKeys;
        ScheduledExecutorService maint;

        @Setup(Level.Trial)
        public void setUp() throws Exception {
            cluster = new MiniCluster(routes, payloadBytes);
            routeKeys = cluster.routeKeys();
            disposal = (c, m) -> {
                if (c != null) {
                    c.close(m);
                }
            };
            final TimeValue ttl = TimeValue.NEG_ONE_MILLISECOND;
            switch (policy.toUpperCase(Locale.ROOT)) {
                case "STRICT": {
                    pool = new StrictConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal, null);
                    break;
                }
                case "LAX": {
                    final LaxConnPool<String, RealConn> lax = new LaxConnPool<>(defMaxPerRoute, ttl, PoolReusePolicy.LIFO, disposal, null);
                    lax.setMaxTotal(maxTotal);
                    pool = lax;
                    break;
                }
                case "OFFLOCK": {
                    pool = new RouteSegmentedConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown policy: " + policy);
                }
            }
            // Light periodic maintenance, close idle/expired like real clients do
            maint = java.util.concurrent.Executors.newSingleThreadScheduledExecutor(daemonFactory("pool-maint"));
            maint.scheduleAtFixedRate(() -> {
                try {
                    pool.closeIdle(TimeValue.ofSeconds(5));
                    pool.closeExpired();
                } catch (final Exception ignore) {
                    // ignore in benchmark
                }
            }, 5, 5, TimeUnit.SECONDS);
        }

        @TearDown(Level.Trial)
        public void tearDown() {
            if (pool != null) {
                pool.close(CloseMode.IMMEDIATE);
            }
            if (cluster != null) {
                cluster.close();
            }
            if (maint != null) {
                maint.shutdownNow();
                try {
                    maint.awaitTermination(5, TimeUnit.SECONDS);
                } catch (final InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        String pickRoute() {
            final int idx = ThreadLocalRandom.current().nextInt(routeKeys.length);
            return routeKeys[idx];
        }

        boolean shouldDiscard() {
            return slowClosePct > 0 && ThreadLocalRandom.current().nextInt(100) < slowClosePct;
        }
    }

    // ---------------------------------------------------------------
    // Benchmark body
    // ---------------------------------------------------------------
    @Benchmark
    @Threads(50)
    public void lease_io_release(final BenchState s) {
        final String key = s.pickRoute();
        final Future<PoolEntry<String, RealConn>> f = s.pool.lease(key, null, Timeout.DISABLED, null);
        final PoolEntry<String, RealConn> e;
        try {
            e = f.get(s.requestTimeoutMs, TimeUnit.MILLISECONDS);
        } catch (final TimeoutException te) {
            // IMPORTANT: drop waiter on pools that queue
            f.cancel(true);
            return;
        } catch (final ExecutionException ee) {
            if (ee.getCause() instanceof TimeoutException) {
                f.cancel(true);
            }
            return;
        } catch (final InterruptedException ie) {
            Thread.currentThread().interrupt();
            return;
        }
        if (e == null) {
            return; // defensive
        }

        RealConn c = e.getConnection();
        if (c == null) {
            // parse host:port defensively
            final int colon = key.indexOf(':');
            if (colon <= 0 || colon >= key.length() - 1) {
                s.pool.release(e, false);
                return;
            }
            final String host = key.substring(0, colon);
            final int port;
            try {
                port = Integer.parseInt(key.substring(colon + 1));
            } catch (final NumberFormatException nfe) {
                s.pool.release(e, false);
                return;
            }
            RealConn fresh = null;
            try {
                fresh = new RealConn(host, port, s.closeSleepMs, s.soTimeoutMs, s.connectTimeoutMs);
                // Double-check before assigning to avoid races
                final RealConn existing = e.getConnection();
                if (existing == null) {
                    try {
                        e.assignConnection(fresh);
                        c = fresh;
                        fresh = null; // ownership transferred
                    } catch (final IllegalStateException already) {
                        // someone else assigned concurrently
                        c = e.getConnection();
                        if (c == null) {
                            s.pool.release(e, false);
                            try {
                                fresh.close(CloseMode.IMMEDIATE);
                            } catch (final Exception ignore) {
                            }
                            return;
                        }
                    }
                } else {
                    c = existing;
                }
            } catch (final IOException ioe) {
                s.pool.release(e, false);
                if (fresh != null) {
                    try {
                        fresh.close(CloseMode.IMMEDIATE);
                    } catch (final Exception ignore) {
                    }
                }
                return;
            } finally {
                if (fresh != null) { // we created but didn't assign -> close to avoid leak
                    try {
                        fresh.close(CloseMode.IMMEDIATE);
                    } catch (final Exception ignore) {
                    }
                }
            }
        }

        if (c == null) {
            s.pool.release(e, false);
            return;
        }

        try {
            c.getOnce(s.keepAlive);
        } catch (final IOException ioe) {
            s.pool.release(e, false);
            return;
        }

        final boolean reusable = s.keepAlive && !s.shouldDiscard();
        if (reusable) {
            e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs));
            s.pool.release(e, true);
        } else {
            s.pool.release(e, false);
        }
    }

    // ---------------------------------------------------------------
    // Local HTTP mini-cluster
    // ---------------------------------------------------------------
    static final class MiniCluster {
        private final List<HttpServer> servers = new ArrayList<>();
        private final String[] keys;
        private final byte[] body;
        private final ExecutorService exec;

        MiniCluster(final int n, final int payloadBytes) throws IOException {
            this.keys = new String[n];
            this.body = new byte[payloadBytes];
            // Bounded, CPU-sized pool to keep the com.sun server in check
            final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
            final int coreThreads = Math.min(64, Math.max(cores, n * 2));
            final int maxThreads = Math.min(128, Math.max(coreThreads, n * 4));
            this.exec = new java.util.concurrent.ThreadPoolExecutor(
                    coreThreads, maxThreads,
                    60L, TimeUnit.SECONDS,
                    new java.util.concurrent.LinkedBlockingQueue<>(2048),
                    daemonFactory("mini-http"),
                    new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
            for (int i = 0; i < n; i++) {
                final InetSocketAddress bind = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
                final HttpServer s = HttpServer.create(bind, 4096);
                s.createContext("/", new FixedHandler(body));
                s.setExecutor(exec);
                s.start();
                servers.add(s);
                keys[i] = "127.0.0.1:" + s.getAddress().getPort();
            }
        }

        String[] routeKeys() {
            return keys;
        }

        void close() {
            for (final HttpServer s : servers) {
                try {
                    s.stop(0);
                } catch (final Exception ignore) {
                }
            }
            exec.shutdownNow();
            try {
                exec.awaitTermination(5, TimeUnit.SECONDS);
            } catch (final InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static final class FixedHandler implements HttpHandler {
        private final byte[] body;

        FixedHandler(final byte[] body) {
            this.body = body;
        }

        @Override
        public void handle(final HttpExchange ex) throws IOException {
            try (InputStream in = ex.getRequestBody()) {
                final byte[] buf = new byte[1024];
                while (in.read(buf) != -1) {
                    // drain
                }
            }
            ex.getResponseHeaders().set("Content-Type", "text/plain; charset=US-ASCII");
            ex.sendResponseHeaders(200, body.length);
            try (OutputStream os = ex.getResponseBody()) {
                if (body.length > 0) {
                    os.write(body);
                }
                os.flush();
            }
        }
    }
}