HttpConnection.java

/*
 * Copyright (c) 2015, 2023 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.jdk.connector.internal;

import java.io.IOException;
import java.net.CookieManager;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.net.ssl.SSLContext;

import org.glassfish.jersey.SslConfigurator;

/**
 * @author Petr Janouch
 */
class HttpConnection {

    /**
     * Input buffer that is used by {@link TransportFilter} when SSL is turned on.
     * The size cannot be smaller than a maximal size of a SSL packet, which is 16kB for payload + header, because
     * {@link SslFilter} does not have its own buffer for buffering incoming
     * data and therefore the entire SSL packet must fit into {@link SslFilter}
     * input buffer.
     * <p/>
     */
    private static final int SSL_INPUT_BUFFER_SIZE = 17_000;
    /**
     * Input buffer that is used by {@link TransportFilter} when SSL is not turned on.
     */
    private static final int INPUT_BUFFER_SIZE = 2048;

    private static final Logger LOGGER = Logger.getLogger(HttpConnection.class.getName());

    private final Filter<HttpRequest, HttpResponse, HttpRequest, HttpResponse> filterChain;
    private final CookieManager cookieManager;
    // we are interested only in host-port pair, but URI is a convenient holder for it
    private final URI uri;
    private final StateChangeListener stateListener;
    private final ScheduledExecutorService scheduler;
    private final ConnectorConfiguration configuration;

    private HttpRequest httpRequest;
    private HttpResponse httResponse;
    private Throwable error;
    volatile State state = State.CREATED;

    // by default we treat all connection as persistent
    // this flag will change to false if we receive "Connection: Close" header
    private boolean persistentConnection = true;

    private Future<?> responseTimeout;
    private Future<?> idleTimeout;
    private Future<?> connectTimeout;

    HttpConnection(URI uri,
                   CookieManager cookieManager,
                   ConnectorConfiguration configuration,
                   ScheduledExecutorService scheduler,
                   StateChangeListener stateListener) {
        this.uri = uri;
        this.cookieManager = cookieManager;
        this.stateListener = stateListener;
        this.configuration = configuration;
        this.scheduler = scheduler;
        filterChain = createFilterChain(uri, configuration);
    }

    synchronized void connect() {
        if (state != State.CREATED) {
            throw new IllegalStateException(LocalizationMessages.HTTP_CONNECTION_ESTABLISHING_ILLEGAL_STATE(state));
        }
        changeState(State.CONNECTING);
        scheduleConnectTimeout();
        filterChain.connect(new InetSocketAddress(uri.getHost(), Utils.getPort(uri)), null);
    }

    synchronized void send(final HttpRequest httpRequest) {
        if (state != State.IDLE) {
            throw new IllegalStateException(
                    "Http request cannot be sent over a connection that is in other state than IDLE. Current state: " + state);
        }

        cancelIdleTimeout();

        this.httpRequest = httpRequest;
        // clean state left by previous request
        httResponse = null;
        error = null;
        persistentConnection = true;
        changeState(State.SENDING_REQUEST);

        addRequestHeaders();

        filterChain.write(httpRequest, new CompletionHandler<HttpRequest>() {
            @Override
            public void failed(Throwable throwable) {
                handleError(throwable);
            }

            @Override
            public void completed(HttpRequest result) {
                handleHeaderSent();
            }
        });
    }

    void close() {
        if (state == State.CLOSED) {
            return;
        }

        cancelAllTimeouts();
        filterChain.close();
        changeState(State.CLOSED);
    }

    private synchronized void handleHeaderSent() {
        if (state != State.SENDING_REQUEST) {
            return;
        }

        scheduleResponseTimeout();

        if (httpRequest.getBodyMode() == HttpRequest.BodyMode.NONE
                || httpRequest.getBodyMode() == HttpRequest.BodyMode.BUFFERED) {
            changeState(State.RECEIVING_HEADER);
        } else {
            ChunkedBodyOutputStream bodyStream = (ChunkedBodyOutputStream) httpRequest.getBodyStream();
            bodyStream.setCloseListener(() -> {
                synchronized (HttpConnection.this) {
                    if (state != State.SENDING_REQUEST) {
                        return;
                    }
                }
                changeState(State.RECEIVING_HEADER);
            });
        }
    }

    private void addRequestHeaders() {
        Map<String, List<String>> cookies;
        try {
            cookies = cookieManager.get(httpRequest.getUri(), httpRequest.getHeaders());
        } catch (IOException e) {
            handleError(e);
            return;
        }

        // unfortunately CookieManager returns ""Cookie" -> empty list" pair if the cookie is not set
        cookies.entrySet().stream().filter(cookieHeader -> cookieHeader.getValue() != null && !cookieHeader.getValue().isEmpty())
                .forEach(cookieHeader -> httpRequest.getHeaders().put(cookieHeader.getKey(), cookieHeader.getValue()));
    }

    private void processResponseHeaders(HttpResponse response) throws IOException {
        cookieManager.put(httpRequest.getUri(), httResponse.getHeaders());
        List<String> connectionValues = response.getHeader(Constants.CONNECTION);
        if (connectionValues != null) {
            connectionValues.stream().filter(connectionValue -> connectionValue.equalsIgnoreCase(Constants.CONNECTION_CLOSE))
                    .forEach(connectionValue -> persistentConnection = false);
        }
    }

    protected Filter<HttpRequest, HttpResponse, HttpRequest, HttpResponse> createFilterChain(URI uri,
                                                                                             ConnectorConfiguration
                                                                                                     configuration) {
        boolean secure = Constants.HTTPS.equals(uri.getScheme());
        Filter<ByteBuffer, ByteBuffer, ?, ?> socket;

        if (secure) {
            SSLContext sslContext = configuration.getSslContext();
            TransportFilter transportFilter = new TransportFilter(SSL_INPUT_BUFFER_SIZE, configuration.getThreadPoolConfig(),
                    configuration.getContainerIdleTimeout());

            if (sslContext == null) {
                sslContext = SslConfigurator.getDefaultContext();

            }

            socket = new SslFilter(transportFilter, sslContext, uri.getHost(),
                            configuration.getHostnameVerifier(), configuration.getSniConfig());
        } else {
            socket = new TransportFilter(INPUT_BUFFER_SIZE, configuration.getThreadPoolConfig(),
                    configuration.getContainerIdleTimeout());
        }

        int maxHeaderSize = configuration.getMaxHeaderSize();
        HttpFilter httpFilter = new HttpFilter(socket, maxHeaderSize, maxHeaderSize + INPUT_BUFFER_SIZE);

        ConnectorConfiguration.ProxyConfiguration proxyConfiguration = configuration.getProxyConfiguration();
        if (proxyConfiguration.isConfigured()) {
            ProxyFilter proxyFilter = new ProxyFilter(httpFilter, proxyConfiguration);
            return new ConnectionFilter(proxyFilter);
        }

        return new ConnectionFilter(httpFilter);
    }

    private void changeState(State newState) {
        if (state == State.CLOSED) {
            return;
        }
        State old = state;
        state = newState;

        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.finest(LocalizationMessages.CONNECTION_CHANGING_STATE(uri.getHost(), uri.getPort(), old, newState));
        }

        stateListener.onStateChanged(this, old, newState);
    }

    private void scheduleResponseTimeout() {
        if (configuration.getResponseTimeout() == 0) {
            return;
        }

        responseTimeout = scheduler.schedule(() -> {
            synchronized (HttpConnection.this) {
                if (state != State.RECEIVING_HEADER && state != State.RECEIVING_BODY) {
                    return;
                }

                responseTimeout = null;
                changeState(State.RESPONSE_TIMEOUT);
                close();
            }
        }, configuration.getResponseTimeout(), TimeUnit.MILLISECONDS);
    }

    private void cancelResponseTimeout() {
        if (responseTimeout != null) {
            responseTimeout.cancel(true);
            responseTimeout = null;
        }
    }

    private void scheduleConnectTimeout() {
        if (configuration.getConnectTimeout() == 0) {
            return;
        }

        connectTimeout = scheduler.schedule(() -> {
            synchronized (HttpConnection.this) {
                if (state != State.CONNECTING) {
                    return;
                }

                connectTimeout = null;
                changeState(State.CONNECT_TIMEOUT);
                close();
            }
        }, configuration.getConnectTimeout(), TimeUnit.MILLISECONDS);
    }

    private void cancelConnectTimeout() {
        if (connectTimeout != null) {
            connectTimeout.cancel(true);
            connectTimeout = null;
        }
    }

    private void scheduleIdleTimeout() {
        if (configuration.getConnectionIdleTimeout() == 0) {
            return;
        }

        idleTimeout = scheduler.schedule(() -> {
            synchronized (HttpConnection.this) {
                if (state != State.IDLE) {
                    return;
                }
                idleTimeout = null;
                changeState(State.IDLE_TIMEOUT);
                close();
            }
        }, configuration.getConnectionIdleTimeout(), TimeUnit.MILLISECONDS);
    }

    private void cancelIdleTimeout() {
        if (idleTimeout != null) {
            idleTimeout.cancel(true);
            idleTimeout = null;
        }
    }

    private void cancelAllTimeouts() {
        cancelConnectTimeout();
        cancelIdleTimeout();
        cancelResponseTimeout();
    }

    private synchronized void handleError(Throwable t) {
        cancelAllTimeouts();
        error = t;
        changeState(State.ERROR);
        close();
    }

    private void changeStateToIdle() {
        scheduleIdleTimeout();
        changeState(State.IDLE);
    }

    Throwable getError() {
        return error;
    }

    HttpResponse getHttResponse() {
        return httResponse;
    }

    private synchronized void handleResponseRead() {
        cancelResponseTimeout();
        changeState(State.RECEIVED);
        if (!persistentConnection) {
            changeState(State.CLOSED);
            return;
        }
        changeStateToIdle();
    }

    private class ConnectionFilter extends Filter<HttpRequest, HttpResponse, HttpRequest, HttpResponse> {

        ConnectionFilter(Filter<HttpRequest, HttpResponse, ?, ?> downstreamFilter) {
            super(downstreamFilter);
        }

        @Override
        boolean processRead(HttpResponse response) {
            synchronized (HttpConnection.this) {
                if (state != State.RECEIVING_HEADER && state != State.SENDING_REQUEST) {
                    return false;
                }

                if (state == State.SENDING_REQUEST) {
                    // great we received response header so fast that we did not even switch into "receiving header" state,
                    // do it now to complete the formal lifecycle
                    // this happens when write completion listener is overtaken by "read event"
                    changeState(State.RECEIVING_HEADER);
                }

                httResponse = response;

                try {
                    processResponseHeaders(response);
                } catch (IOException e) {
                    handleError(e);
                    return false;
                }
            }

            if (response.getHasContent()) {
                AsynchronousBodyInputStream bodyStream = httResponse.getBodyStream();
                changeState(State.RECEIVING_BODY);
                bodyStream.setStateChangeLister(new AsynchronousBodyInputStream.StateChangeLister() {
                    @Override
                    public void onError(Throwable t) {
                        handleError(t);
                    }

                    @Override
                    public void onAllDataRead() {
                        handleResponseRead();
                    }
                });

            } else {
                handleResponseRead();
            }
            return false;
        }

        @Override
        void processConnect() {
            synchronized (HttpConnection.this) {
                if (state != State.CONNECTING) {
                    return;
                }

                downstreamFilter.startSsl();
            }
        }

        @Override
        void processSslHandshakeCompleted() {
            synchronized (HttpConnection.this) {
                if (state != State.CONNECTING) {
                    return;
                }

                cancelConnectTimeout();
                changeStateToIdle();
            }
        }

        @Override
        void processConnectionClosed() {
            synchronized (HttpConnection.this) {
                cancelAllTimeouts();
                changeState(State.CLOSED_BY_SERVER);
                HttpConnection.this.close();
            }
        }

        @Override
        void processError(Throwable t) {
            handleError(t);
        }

        @Override
        void write(HttpRequest data, CompletionHandler<HttpRequest> completionHandler) {
            downstreamFilter.write(data, completionHandler);
        }
    }

    enum State {
        CREATED,
        CONNECTING,
        CONNECT_TIMEOUT,
        IDLE,
        SENDING_REQUEST,
        RECEIVING_HEADER,
        RECEIVING_BODY,
        RECEIVED,
        RESPONSE_TIMEOUT,
        CLOSED_BY_SERVER,
        CLOSED,
        ERROR,
        IDLE_TIMEOUT
    }

    interface StateChangeListener {

        void onStateChanged(HttpConnection connection, State oldState, State newState);
    }
}