DestinationConnectionPool.java

/*
 * Copyright (c) 2015, 2021 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.jdk.connector.internal;

import java.io.IOException;
import java.net.CookieManager;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ScheduledExecutorService;

/**
 * @author Petr Janouch
 */
class DestinationConnectionPool {

    private final ConnectorConfiguration configuration;
    private final Queue<HttpConnection> idleConnections = new ConcurrentLinkedDeque<>();
    private final Set<HttpConnection> connections = Collections.newSetFromMap(new ConcurrentHashMap<>());
    private final Queue<RequestRecord> pendingRequests = new ConcurrentLinkedDeque<>();
    private final Map<HttpConnection, RequestRecord> requestsInProgress = new ConcurrentHashMap<>();
    private final CookieManager cookieManager;
    private final ScheduledExecutorService scheduler;
    private final ConnectionStateListener connectionStateListener;

    private volatile ConnectionCloseListener connectionCloseListener;

    private int connectionCounter = 0;
    private boolean closed = false;

    DestinationConnectionPool(ConnectorConfiguration configuration,
                              CookieManager cookieManager,
                              ScheduledExecutorService scheduler) {
        this.configuration = configuration;
        this.cookieManager = cookieManager;
        this.scheduler = scheduler;
        this.connectionStateListener = new ConnectionStateListener();
    }

    void setConnectionCloseListener(ConnectionCloseListener connectionCloseListener) {
        this.connectionCloseListener = connectionCloseListener;
    }

    void send(HttpRequest httpRequest, CompletionHandler<HttpResponse> completionHandler) {
        pendingRequests.add(new RequestRecord(httpRequest, completionHandler));
        processPendingRequests();
    }

    private void processPendingRequests(HttpConnection connection) {
        HttpRequest httpRequest;
        CompletionHandler<HttpResponse> completionHandler;

        synchronized (this) {
        /* this is synchronized so that another thread does not steal the pending request at the head of the queue
           while we investigate if we can execute it. */
            RequestRecord pendingHead = pendingRequests.poll();
            if (pendingHead == null) {

                idleConnections.add(connection);

                // no pending requests
                return;
            }

            httpRequest = pendingHead.request;
            completionHandler = pendingHead.completionHandler;
        }

        // if there was a connection available just use it
        requestsInProgress.put(connection, new RequestRecord(httpRequest, completionHandler));
        connection.send(httpRequest);
    }

    private void processPendingRequests() {
        HttpConnection connection;
        HttpRequest httpRequest;
        CompletionHandler<HttpResponse> completionHandler;

        synchronized (this) {
            /* this is synchronized so that another thread does not steal the pending request at the head of the queue
            while we investigate if we can execute it. */
            RequestRecord pendingHead = pendingRequests.peek();
            if (pendingHead == null) {
                // no pending requests
                return;
            }

            httpRequest = pendingHead.request;
            completionHandler = pendingHead.completionHandler;

            connection = idleConnections.poll();
            if (connection != null) {
                pendingRequests.poll();
            }
        }

        if (connection != null) {
            // if there was a connection available just use it
            requestsInProgress.put(connection, new RequestRecord(httpRequest, completionHandler));
            connection.send(httpRequest);
            return;
        }

        // if there was not a connection available keep this requests in pending list and try to create a connection
        synchronized (this) {
            // synchronized because other thread might open/close connections, so we have to make sure we get the limits right.

            if (configuration.getMaxConnectionsPerDestination() == connectionCounter) {
                // we are at the limit for this destination, just wait for a connection to become idle or close
                return;
            }

            // create a connection
            connection = new HttpConnection(httpRequest.getUri(), cookieManager, configuration, scheduler,
                    connectionStateListener);
            connections.add(connection);
            connectionCounter++;
        }

        // we don't want to connect inside the synchronized block
        connection.connect();
    }

    synchronized void close() {
        if (closed) {
            return;
        }

        closed = true;

        connections.forEach(HttpConnection::close);
    }

    private RequestRecord getRequest(HttpConnection connection) {
        RequestRecord requestRecord = requestsInProgress.get(connection);
        if (requestRecord == null) {
            throw new IllegalStateException("Request not found");
        }

        return requestRecord;
    }

    private RequestRecord removeRequest(HttpConnection connection) {
        RequestRecord requestRecord = requestsInProgress.get(connection);
        if (requestRecord == null) {
            throw new IllegalStateException("Request not found");
        }

        return requestRecord;
    }

    private void cleanClosedConnection(HttpConnection connection) {
        if (closed) {
            return;
        }

        RequestRecord pendingRequest;
        synchronized (this) {
            idleConnections.remove(connection);
            connections.remove(connection);
            connectionCounter--;

            pendingRequest = pendingRequests.peek();
            if (pendingRequest == null) {
                if (connectionCounter == 0) {
                    connectionCloseListener.onLastConnectionClosed();
                }
                return;
            }
        }

        processPendingRequests();
    }

    private void handleIllegalStateTransition(HttpConnection.State oldState, HttpConnection.State newState) {
        throw new IllegalStateException("Illegal state transition, old state: " + oldState + " new state: " + newState);
    }

    private void removeAllPendingWithError(Throwable t) {
        RequestRecord requestRecord = null;
        while ((requestRecord = pendingRequests.poll()) != null) {
            requestRecord.completionHandler.failed(t);
        }
    }

    private class ConnectionStateListener implements HttpConnection.StateChangeListener {

        @Override
        public void onStateChanged(HttpConnection connection, HttpConnection.State oldState, HttpConnection.State newState) {
            switch (newState) {

                case IDLE: {
                    switch (oldState) {
                        case RECEIVED:
                        case CONNECTING: {
                            processPendingRequests(connection);
                            return;
                        }

                        default: {
                            handleIllegalStateTransition(oldState, newState);
                            return;
                        }
                    }
                }

                case RECEIVED: {
                    switch (oldState) {
                        case RECEIVING_HEADER: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler.completed(connection.getHttResponse());
                            return;
                        }

                        case RECEIVING_BODY: {
                            removeRequest(connection);
                            return;
                        }

                        default: {
                            handleIllegalStateTransition(oldState, newState);
                            return;
                        }
                    }
                }

                case RECEIVING_BODY: {
                    switch (oldState) {
                        case RECEIVING_HEADER: {
                            RequestRecord request = getRequest(connection);
                            request.response = connection.getHttResponse();
                            request.completionHandler.completed(connection.getHttResponse());
                            return;
                        }

                        default: {
                            handleIllegalStateTransition(oldState, newState);
                            return;
                        }
                    }
                }

                case ERROR: {
                    switch (oldState) {
                        case SENDING_REQUEST: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler.failed(connection.getError());
                            return;
                        }

                        case RECEIVING_HEADER: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler.failed(connection.getError());
                            return;
                        }

                        case RECEIVING_BODY: {
                            requestsInProgress.remove(connection);
                            return;
                        }

                        case CONNECTING: {
                            removeAllPendingWithError(connection.getError());
                            return;
                        }

                        default: {
                            connection.getError().printStackTrace();
                            handleIllegalStateTransition(oldState, newState);
                            return;
                        }
                    }
                }

                case RESPONSE_TIMEOUT: {
                    switch (oldState) {
                        case RECEIVING_HEADER: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler
                                    .failed(new IOException(LocalizationMessages.TIMEOUT_RECEIVING_RESPONSE()));
                            return;
                        }

                        case RECEIVING_BODY: {
                            RequestRecord request = requestsInProgress.remove(connection);
                            request.response.getBodyStream()
                                    .notifyError(new IOException(LocalizationMessages.TIMEOUT_RECEIVING_RESPONSE_BODY()));
                            return;
                        }

                        default: {
                            handleIllegalStateTransition(oldState, newState);
                            return;
                        }
                    }
                }

                case CLOSED_BY_SERVER: {
                    switch (oldState) {
                        case SENDING_REQUEST: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler
                                    .failed(new IOException(LocalizationMessages.CLOSED_WHILE_SENDING_REQUEST()));
                            return;
                        }

                        case RECEIVING_HEADER: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler
                                    .failed(new IOException(LocalizationMessages.CLOSED_WHILE_RECEIVING_RESPONSE(),
                                            connection.getError()));
                            return;
                        }

                        case RECEIVING_BODY: {
                            RequestRecord request = requestsInProgress.remove(connection);
                            request.response.getBodyStream().notifyError(
                                    new IOException(LocalizationMessages.CLOSED_WHILE_RECEIVING_BODY(),
                                            connection.getError()));
                            return;
                        }

                        case CONNECTING: {
                            removeAllPendingWithError(new IOException(LocalizationMessages.CONNECTION_CLOSED()));
                            return;
                        }
                    }
                }

                case CLOSED: {
                    switch (oldState) {
                        case SENDING_REQUEST: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler
                                    .failed(new IOException(LocalizationMessages.CLOSED_BY_CLIENT_WHILE_SENDING()));
                            cleanClosedConnection(connection);
                            return;
                        }

                        case RECEIVING_HEADER: {
                            RequestRecord request = removeRequest(connection);
                            request.completionHandler
                                    .failed(new IOException(LocalizationMessages.CLOSED_WHILE_RECEIVING_RESPONSE()));
                            cleanClosedConnection(connection);
                            return;
                        }

                        case RECEIVING_BODY: {
                            RequestRecord request = requestsInProgress.remove(connection);
                            request.response.getBodyStream().notifyError(
                                    new IOException(LocalizationMessages.CLOSED_BY_CLIENT_WHILE_RECEIVING_BODY(),
                                            connection.getError()));
                            cleanClosedConnection(connection);
                            return;
                        }

                        default: {
                            cleanClosedConnection(connection);
                            return;
                        }
                    }
                }

                case CONNECT_TIMEOUT: {
                    switch (oldState) {
                        case CONNECTING: {
                            removeAllPendingWithError(new IOException(LocalizationMessages.CONNECTION_TIMEOUT()));
                            return;
                        }

                        default: {
                            cleanClosedConnection(connection);
                        }
                    }
                }
            }
        }
    }

    private static class RequestRecord {

        private final HttpRequest request;
        private final CompletionHandler<HttpResponse> completionHandler;
        private HttpResponse response;

        RequestRecord(HttpRequest request, CompletionHandler<HttpResponse> completionHandler) {
            this.request = request;
            this.completionHandler = completionHandler;
        }
    }

    static class DestinationKey {

        private final String host;
        private final int port;
        private final boolean secure;

        DestinationKey(URI uri) {
            host = uri.getHost();
            port = Utils.getPort(uri);
            secure = Constants.HTTPS.equalsIgnoreCase(uri.getScheme());
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            DestinationKey that = (DestinationKey) o;

            return port == that.port && secure == that.secure && host.equals(that.host);
        }

        @Override
        public int hashCode() {
            int result = host.hashCode();
            result = 31 * result + port;
            result = 31 * result + (secure ? 1 : 0);
            return result;
        }
    }

    interface ConnectionCloseListener {

        void onLastConnectionClosed();
    }
}