HttpClientHTTPConduit.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.cxf.transport.http;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PushbackInputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.security.AccessController;
import java.security.GeneralSecurityException;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.security.cert.Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSession;

import org.apache.cxf.Bus;
import org.apache.cxf.common.util.PropertyUtils;
import org.apache.cxf.configuration.jsse.TLSClientParameters;
import org.apache.cxf.helpers.HttpHeaderHelper;
import org.apache.cxf.helpers.JavaUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.io.CacheAndWriteOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.policy.impl.ClientPolicyCalculator;
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
import org.apache.cxf.transport.https.SSLUtils;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.EndpointReferenceType;


public class HttpClientHTTPConduit extends URLConnectionHTTPConduit {
    private static final String FORCE_URLCONNECTION_HTTP_CONDUIT = "force.urlconnection.http.conduit";
    private static final String SHARE_HTTPCLIENT_CONDUIT = "share.httpclient.http.conduit";
    private static final String HTTPS_RESET_HTTPCLIENT_CONDUIT = "https.reset.httpclient.http.conduit";

    private static final Set<String> RESTRICTED_HEADERS = getRestrictedHeaders();
    private static final HttpClientCache CLIENTS_CACHE = new HttpClientCache();
    volatile RefCount<HttpClient> clientRef;
    volatile URI sslURL;
    private final ReentrantLock initializationLock = new ReentrantLock();
    private final Queue<RefCount<HttpClient>> deferredClientRefs = new ConcurrentLinkedQueue<>();

    private static final class RefCount<T extends HttpClient> {
        private final AtomicLong count = new AtomicLong();
        private final TLSClientParameters clientParameters;
        private final HTTPClientPolicy policy;
        private final T client;
        private final Runnable finalizer;

        RefCount(T client, HTTPClientPolicy policy, TLSClientParameters clientParameters, Runnable finalizer) {
            this.client = client;
            this.policy = policy;
            this.clientParameters = clientParameters;
            this.finalizer = finalizer;
        }

        RefCount<T> acquire() {
            count.incrementAndGet();
            return this;
        }

        void release() {
            if (count.decrementAndGet() == 0) {
                finalizer.run();

                if (client instanceof AutoCloseable) {
                    try {
                        // The HttpClient::close may hang during the termination.
                        try {
                            // Try to call shutdownNow() first
                            AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
                                try {
                                    MethodHandles.publicLookup()
                                        .findVirtual(HttpClient.class, "shutdownNow", MethodType.methodType(void.class))
                                        .bindTo(client)
                                        .invokeExact();
                                    return null;
                                } catch (final Throwable ex) {
                                    if (ex instanceof Error) {
                                        throw (Error) ex;
                                    } else {
                                        throw (Exception) ex;
                                    }
                                }
                            });
                        } catch (final PrivilegedActionException e) {
                            //ignore
                        }

                        ((AutoCloseable)client).close();
                    } catch (Exception e) {
                        //ignore
                    }
                } else if (client != null) {
                    tryToShutdownSelector(client);
                }
            }
        }

        HttpClient client() {
            return client;
        }

        HTTPClientPolicy policy() {
            return policy;
        }

        public TLSClientParameters clientParameters() {
            return clientParameters;
        }
    }

    private static final class HttpClientCache {
        private static final int MAX_SIZE = 100; // Keeping at most 100 clients

        private final List<RefCount<HttpClient>> clients = new ArrayList<>();
        private final ClientPolicyCalculator cpc = new ClientPolicyCalculator();
        private final ReentrantLock lock = new ReentrantLock();

        RefCount<HttpClient> computeIfAbsent(final boolean shareHttpClient, final HTTPClientPolicy policy,
                final TLSClientParameters clientParameters, final Supplier<HttpClient> supplier) {

            // Do not share if it is not allowed for the conduit or cache capacity is exceeded
            if (!shareHttpClient || clients.size() >= MAX_SIZE) {
                return new RefCount<HttpClient>(supplier.get(), policy, clientParameters, () -> { }).acquire();
            }

            lock.lock();
            try {
                for (final RefCount<HttpClient> p: clients) {
                    if (cpc.equals(p.policy(), policy) && p.clientParameters().equals(clientParameters)) {
                        return p.acquire();
                    }
                }

                final HttpClient client = supplier.get();
                final RefCount<HttpClient> clientRef = new RefCount<HttpClient>(client, policy, clientParameters,
                        () -> this.remove(policy, clientParameters));
                clients.add(clientRef);

                return clientRef.acquire();
            } finally {
                lock.unlock();
            }
        }

        void remove(final HTTPClientPolicy policy, final TLSClientParameters clientParameters) {
            lock.lock();
            try {
                final Iterator<RefCount<HttpClient>> iterator = clients.iterator();
                while (iterator.hasNext()) {
                    final RefCount<HttpClient> p = iterator.next();
                    if (cpc.equals(p.policy(), policy) && p.clientParameters().equals(clientParameters)) {
                        iterator.remove();
                        break;
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    }

    public HttpClientHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t) throws IOException {
        super(b, ei, t);
    }

    private static Set<String> getRestrictedHeaders() {
        Set<String> headers = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        headers.addAll(Set.of("Connection", "Content-Length", "Expect", "Host", "Upgrade"));
        return headers;
    }

    private boolean isSslTargetDifferent(URI lastURL, URI url) {
        return !lastURL.getScheme().equals(url.getScheme())
                || !lastURL.getHost().equals(url.getHost())
                || lastURL.getPort() != url.getPort();
    }

    @Override
    public void close(Message msg) throws IOException {
        try {
            OutputStream os = msg.getContent(OutputStream.class);
            // Java 21 may hang on close, we flush stream to help close them out.
            if (os != null && AutoCloseable.class.isAssignableFrom(HttpClient.class)) {
                os.flush();
            }
        } catch (IOException ioException) {
            // ignore
        }
        super.close(msg);
        msg.remove(HttpClient.class);
    }

    /**
     * Close the conduit
     */
    public void close() {
        if (clientRef != null) {
            clientRef.release();
            clientRef = null;
        }
        deferredClientRefs.forEach(RefCount::release);
        deferredClientRefs.clear();
        defaultAddress = null;
        super.close();
    }
    private static void tryToShutdownSelector(HttpClient client) {
        synchronized (client) {
            String n = client.toString();

            // it can take three seconds (or more) for the JVM to determine the client
            // is unreferenced and then shutdown the selector thread, we'll try and speed that
            // up.  This is somewhat of a complete hack.
            int idx = n.lastIndexOf('(');
            if (idx > 0) {
                n = n.substring(idx + 1);
                n = n.substring(0, n.length() - 1);
                n = "HttpClient-" + n + "-SelectorManager";
            }
            try {
                ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
                Thread[] threads = new Thread[rootGroup.activeCount()];
                int cnt = rootGroup.enumerate(threads);
                for (int x = 0; x < cnt; x++) {
                    if (threads[x].getName().contains(n)) {
                        final int index = x;
                        AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
                            threads[index].interrupt();
                            return null;
                        });
                    }
                }
            } catch (Throwable t) {
                //ignore, nothing we can do except wait for the garbage collection
                //and then the three seconds for the timeout
            }
        }
    }

    @Override
    protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy) throws IOException {
        URI uri = address.getURI();
        message.put("http.scheme", uri.getScheme());
        // check tlsClientParameters from message header
        TLSClientParameters clientParameters = message.get(TLSClientParameters.class);
        if (clientParameters == null) {
            clientParameters = tlsClientParameters;
        }
        if (clientParameters == null) {
            clientParameters = new TLSClientParameters();
        }
        Object o = message.getContextualProperty(FORCE_URLCONNECTION_HTTP_CONDUIT);
        if (o == null) {
            o = message.get("USING_URLCONNECTION");
        }
        //o = true;
        if ("https".equals(uri.getScheme()) && clientParameters != null) {
            if (clientParameters.getSSLSocketFactory() != null) {
                //if they configured in an SSLSocketFactory, we cannot do anything
                //with it as the NIO based transport cannot use socket created from
                //the SSLSocketFactory.
                o = Boolean.TRUE;
            }
            if (clientParameters.isDisableCNCheck()) {
                if (clientParameters.getSslContext() != null) {
                    // If they specify their own SSLContext, we cannot handle the
                    // HostnameVerifier so we'll need to use the URLConnection
                    o = Boolean.TRUE;
                }
                if (clientParameters.getTrustManagers() != null
                    && JavaUtils.getJavaMajorVersion() < 14) {
                    // trustmanagers hacks don't work on Java11
                    o = Boolean.TRUE;
                }
            }
        }
        if (Boolean.TRUE.equals(o)) {
            message.put("USING_URLCONNECTION", Boolean.TRUE);
            super.setupConnection(message, address, csPolicy);
            return;
        }

        if (sslURL != null && isSslTargetDifferent(sslURL, uri)) {
            sslURL = null;

            // Reset the client in case of HTTPS URL change
            final boolean httpsResetHttpClient = MessageUtils.getContextualBoolean(message,
                HTTPS_RESET_HTTPCLIENT_CONDUIT, true);
            if (httpsResetHttpClient) {
                final RefCount<HttpClient> ref = clientRef;
                // Do not release client immediately since it could be in use, instead
                // move it off to deferred release queue.
                if (ref != null) {
                    deferredClientRefs.add(ref);
                    clientRef = null;
                }
            }
        }
        // If the HTTP_REQUEST_METHOD is not set, the default is "POST".
        String httpRequestMethod =
            (String)message.get(Message.HTTP_REQUEST_METHOD);
        if (httpRequestMethod == null) {
            httpRequestMethod = "POST";
            message.put(Message.HTTP_REQUEST_METHOD, "POST");
        }

        RefCount<HttpClient> cl = clientRef;
        if (cl == null) {
            int ctimeout = determineConnectionTimeout(message, csPolicy);
            ProxySelector ps = new ProxyFactoryProxySelector(proxyFactory, csPolicy);

            HttpClient.Builder cb = HttpClient.newBuilder()
                .proxy(ps)
                .followRedirects(Redirect.NEVER);

            if (ctimeout > 0) {
                cb.connectTimeout(Duration.ofMillis(ctimeout));
            }

            if ("https".equals(uri.getScheme())) {
                sslURL = uri;
                try {
                    SSLContext sslContext = clientParameters.getSslContext();
                    if (sslContext == null) {
                        sslContext = SSLUtils.getSSLContext(clientParameters, true);
                        cb.sslContext(sslContext);
                    }
                    if (sslContext != null) {
                        cb.sslContext(sslContext);
                        String[] supportedCiphers =  org.apache.cxf.configuration.jsse.SSLUtils
                                .getSupportedCipherSuites(sslContext);
                        String[] cipherSuites = org.apache.cxf.configuration.jsse.SSLUtils
                                .getCiphersuitesToInclude(clientParameters.getCipherSuites(),
                                                          clientParameters.getCipherSuitesFilter(),
                                                          sslContext.getSocketFactory().getDefaultCipherSuites(),
                                                          supportedCiphers,
                                                          LOG);

                        if (clientParameters.getSecureSocketProtocol() != null) {
                            String protocol = clientParameters.getSecureSocketProtocol();
                            SSLParameters params = new SSLParameters(cipherSuites, new String[] {protocol});
                            cb.sslParameters(params);
                        } else {
                            final SSLParameters params = new SSLParameters(cipherSuites,
                                TLSClientParameters.getPreferredClientProtocols());
                            cb.sslParameters(params);
                        }
                    }
                } catch (GeneralSecurityException e) {
                    throw new IOException(e);
                }
            }
            String verc = (String)message.getContextualProperty(FORCE_HTTP_VERSION);
            if (verc == null) {
                verc = csPolicy.getVersion();
            }
            if ("1.1".equals(HTTP_VERSION) || "1.1".equals(verc)) {
                cb.version(Version.HTTP_1_1);
            }

            // make sure the conduit is not yet initialized
            initializationLock.lock();
            try {
                cl = clientRef;
                if (cl == null) {
                    final boolean shareHttpClient = MessageUtils.getContextualBoolean(message,
                        SHARE_HTTPCLIENT_CONDUIT, true);
                    cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, csPolicy, clientParameters, () -> cb.build());

                    if (!"https".equals(uri.getScheme())
                        && !KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
                        && cl.client().version() == Version.HTTP_2
                        && ("2".equals(verc) || ("auto".equals(verc) && "2".equals(HTTP_VERSION)))) {
                        try {
                            // We specifically want HTTP2, but we're using a request
                            // that won't trigger an upgrade to HTTP/2 so we'll
                            // call OPTIONS on the URI which may trigger HTTP/2 upgrade.
                            // Not needed for methods that don't have a body (GET/HEAD/etc...)
                            // or for https (negotiated at the TLS level)
                            HttpRequest.Builder rb = HttpRequest.newBuilder()
                                .uri(uri)
                                .method("OPTIONS", BodyPublishers.noBody());
                            cl.client().send(rb.build(), BodyHandlers.ofByteArray());
                        } catch (IOException | InterruptedException e) {
                            //
                        }
                    }

                    clientRef = cl;
                }
            } finally {
                initializationLock.unlock();
            }
        }
        message.put(HttpClient.class, cl.client());

        message.put(KEY_HTTP_CONNECTION_ADDRESS, address);
    }

    @Override
    protected OutputStream createOutputStream(Message message, boolean needToCacheRequest, boolean isChunking,
                                              int chunkThreshold)
        throws IOException {

        Object o = message.get("USING_URLCONNECTION");
        if (Boolean.TRUE == o) {
            return super.createOutputStream(message, needToCacheRequest, isChunking, chunkThreshold);
        }
        return new HttpClientWrappedOutputStream(message,
                                                 needToCacheRequest,
                                                 isChunking,
                                                 chunkThreshold,
                                                 getConduitName());
    }


    /**
     * This class <i>must</i> be static so it doesn't capture a reference to {@code HttpClientHTTPConduit.this} and
     * through that to {@link HttpClientHTTPConduit#client}. Otherwise the client can never be garbage collected, which
     * means that the companion "SelectorManager" thread keeps running indefinitely (see CXF-8885).
     */
    private static final class ProxyFactoryProxySelector extends ProxySelector {
        private final ProxyFactory proxyFactory;
        private final HTTPClientPolicy csPolicy;

        ProxyFactoryProxySelector(ProxyFactory proxyFactory, HTTPClientPolicy csPolicy) {
            this.proxyFactory = proxyFactory;
            this.csPolicy = csPolicy;
        }

        @Override
        public List<Proxy> select(URI uri) {
            Proxy proxy = proxyFactory.createProxy(csPolicy, uri);
            if (proxy !=  null) {
                return Arrays.asList(proxy);
            }
            List<Proxy> listProxy;
            if (System.getSecurityManager() != null) {
                try {
                    listProxy = AccessController.doPrivileged(new PrivilegedExceptionAction<List<Proxy>>() {
                        @Override
                        public List<Proxy> run() throws IOException {
                            return ProxySelector.getDefault().select(uri);
                        }
                    });
                } catch (PrivilegedActionException e) {
                    throw new RuntimeException(e);
                }
            } else {
                listProxy = ProxySelector.getDefault().select(uri);
            }
            return listProxy;
        }

        @Override
        public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
        }
    }

    static class HttpClientPipedOutputStream extends PipedOutputStream {
        HttpClientWrappedOutputStream stream;
        HTTPClientPolicy csPolicy;
        CloseableBodyPublisher publisher;
        HttpClientPipedOutputStream(HttpClientWrappedOutputStream s, 
                                    PipedInputStream pin,
                                    HTTPClientPolicy cp,
                                    CloseableBodyPublisher bp) throws IOException {
            super(pin);
            stream = s;
            csPolicy = cp;
            publisher = bp;
        }
        public void close() throws IOException {
            super.close();
            csPolicy = null;
            stream = null;
            if (publisher != null) {
                publisher.close();
                publisher = null;
            }
        }
        synchronized boolean canWrite() throws IOException {
            return stream.isConnectionAttemptCompleted(csPolicy, this);
        }
        @Override
        public void write(int b) throws IOException {
            if (stream != null && (stream.connectionComplete || canWrite())) {
                super.write(b);
            }
        }
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            if (stream != null && (stream.connectionComplete || canWrite())) {
                super.write(b, off, len);
            }
        }

    };
    private static final class HttpClientFilteredInputStream extends FilterInputStream {
        boolean closed;

        private HttpClientFilteredInputStream(InputStream in) {
            super(in);
        }
        @Override
        public int read() throws IOException {
            if (closed) {
                throw new IOException("stream is closed");
            }
            return super.read();
        }

        @Override
        public int read(byte[] b) throws IOException {
            if (closed) {
                throw new IOException("stream is closed");
            }
            return super.read(b);
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            if (closed) {
                throw new IOException("stream is closed");
            }
            return super.read(b, off, len);
        }

        @Override
        public void close() throws IOException {
            if (!closed) {
                closed = true;
                super.close();
                in = null;
            }
        }
    }

    /**
     * The interface for {@link BodyPublisher}s that implement {@link Closeable} as well.
     */
    private interface CloseableBodyPublisher extends BodyPublisher, Closeable {
    }
    
    /**
     * The {@link BodyPublisher} that wraps around the output stream.
     */
    private static final class HttpClientBodyPublisher implements CloseableBodyPublisher {
        private Supplier<InputStream> pin;
        private HttpClientWrappedOutputStream stream;
        private long contentLen;

        private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, Supplier<InputStream> pin) {
            this.stream = s;
            this.pin = pin;
        }

        public synchronized void close() {
            if (stream != null) {
                contentLen = stream.contentLen;
                stream = null;
            }
        }

        @Override
        public synchronized void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            if (stream != null) {
                stream.connectionComplete = true;
                contentLen = stream.contentLen;
                if (stream.pout != null) {
                    synchronized (stream.pout) {
                        stream.pout.notifyAll();
                    }
                    if (stream != null) {
                        contentLen = stream.contentLen;
                    }
                    BodyPublishers.ofInputStream(pin).subscribe(subscriber);
                    stream = null;
                    pin = null;
                    return;
                }
            }
            BodyPublishers.noBody().subscribe(subscriber);
        }

        @Override
        public long contentLength() {
            if (stream != null) {
                contentLen = stream.contentLen;
            }
            return contentLen;
        }
    }

    /**
     * The {@link BodyPublisher} that awaits for the output stream to be fully flushed (closed) 
     * so the content length becomes known (sized). It is used when the chunked transfer is not allowed
     * but the content length is not specified up-front.
     */
    private static final class HttpClientSizedBodyPublisher implements CloseableBodyPublisher {
        private HTTPClientPolicy csPolicy;
        private Supplier<ByteArrayInputStream> pin;
        private HttpClientWrappedOutputStream stream;
        private long contentLen;

        private HttpClientSizedBodyPublisher(HttpClientWrappedOutputStream s, HTTPClientPolicy cs,
                Supplier<ByteArrayInputStream> pin) {
            this.stream = s;
            this.csPolicy = cs;
            this.pin = pin;
        }

        public synchronized void close() {
            if (stream != null) {
                contentLen = stream.contentLen;
                stream = null;
            }
        }

        @Override
        public synchronized void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            if (stream != null) {
                stream.connectionComplete = true;
                if (stream.pout != null) {
                    synchronized (stream.pout) {
                        stream.pout.notifyAll();
                    }

                    BodyPublishers.ofInputStream(pin).subscribe(subscriber);
                    stream = null;
                    pin = null;
                    return;
                }
            }
            BodyPublishers.noBody().subscribe(subscriber);
        }

        @Override
        public long contentLength() {
            if (stream != null && stream.pout != null) {
                final CloseableByteArrayOutputStream baos = (CloseableByteArrayOutputStream) stream.pout;

                try {
                    synchronized (baos) {
                        if (!baos.closed) {
                            baos.wait(csPolicy.getConnectionTimeout());
                        }
                    }
                    contentLen = (int) baos.size();
                } catch (InterruptedException e) {
                    //ignore
                }
            }

            return contentLen;
        }
    }

    /**
     * The {@link ByteArrayOutputStream} implementation that tracks the closeability state.
     */
    private static final class CloseableByteArrayOutputStream extends ByteArrayOutputStream {
        private boolean closed;

        /**
         * Creates a new output stream for user data
         */
        CloseableByteArrayOutputStream() {
            super(4096);
        }

        /**
         * Writes the specified byte to this output stream.
         *
         * @param   b   the byte to be written.
         */
        public synchronized void write(int b) {
            if (closed) {
                return;
            }
            super.write(b);
        }

        /**
         * Writes <code>len</code> bytes from the specified byte array
         * starting at offset <code>off</code> to this output stream.
         *
         * @param   b     the data.
         * @param   off   the start offset in the data.
         * @param   len   the number of bytes to write.
         */
        public synchronized void write(byte[] b, int off, int len) {
            if (closed) {
                return;
            }
            super.write(b, off, len);
        }

        /**
         * Resets the <code>count</code> field of this output
         * stream to zero, so that all currently accumulated output in the
         * output stream is discarded. The output stream can be used again,
         * reusing the already allocated buffer space. If the output stream
         * has been closed, then this method has no effect.
         *
         * @see     java.io.ByteArrayInputStream#count
         */
        public synchronized void reset() {
            if (closed) {
                return;
            }
            super.reset();
        }

        /**
         * After close() has been called, it is no longer possible to write
         * to this stream. Further calls to write will have no effect.
         */
        public synchronized void close() throws IOException {
            closed = true;
            super.close();
            notifyAll();
        }

        /**
         * Returns new instance of the {@link ByteArrayInputStream} that uses the same underlying buffer as 
         * this stream. The steam must be closed in order to ensure no further modifications could happen. 
         * @return new instance of the {@link ByteArrayInputStream}
         */
        public ByteArrayInputStream getInputStream() {
            if (!closed) {
                throw new IllegalStateException("The stream is not closed and underlying buffer "
                        + "could still be changed");
            }

            // Creates new ByteArrayInputStream instance that respects the current state of the buffer 
            // (since ByteArrayInputStream::toByteArray() does array copy).
            return new ByteArrayInputStream(this.buf, 0, this.count);
        }
    }

    class HttpClientWrappedOutputStream extends WrappedOutputStream {  

        List<Flow.Subscriber<? super ByteBuffer>> subscribers = new LinkedList<>();
        CompletableFuture<HttpResponse<InputStream>> future;
        long contentLen = -1;
        int rtimeout;
        volatile Throwable exception;
        volatile boolean connectionComplete;
        OutputStream pout;
        CloseableBodyPublisher publisher;
        HttpRequest request;


        HttpClientWrappedOutputStream(Message message,
                                      boolean needToCacheRequest, boolean isChunking,
                                      int chunkThreshold, String conduitName) {
            super(message, needToCacheRequest, isChunking,
                  chunkThreshold, conduitName, ((Address)message.get(KEY_HTTP_CONNECTION_ADDRESS)).getURI());
        }

        @Override
        public void close() throws IOException {
            try {
                super.close();
            } finally {
                if (pout != null) {
                    try {
                        pout.close();
                    } catch (IOException e) {
                        logStackTrace(e);
                    }
                    pout = null;
                }
                if (publisher != null) {
                    try {
                        publisher.close();
                    } catch (IOException e) {
                        logStackTrace(e);
                    }
                    publisher = null;
                }
                request = null;
                subscribers = null;
            }
            
            
        }
        void addSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
            subscribers.add(subscriber);
        }

        @Override
        protected void setFixedLengthStreamingMode(int i) {
            contentLen = i;
        }

        @Override
        protected void handleNoOutput() throws IOException {
            contentLen = 0;
            if (pout != null) {
                pout.close();
            }
            if (exception != null) {
                if (exception instanceof IOException) {
                    throw (IOException)exception;
                } else {
                    throw new IOException(exception);
                }
            }
        }

        public void setProtocolHeadersInBuilder(HttpRequest.Builder rb) throws IOException {
            boolean addHeaders = MessageUtils.getContextualBoolean(outMessage, Headers.ADD_HEADERS_PROPERTY, false);
            Headers h = new Headers(outMessage);
            boolean hasCT = false;
            for (Map.Entry<String, List<String>>  head : h.headerMap().entrySet()) {
                List<String> headerList = head.getValue();
                String header = head.getKey();
                if (RESTRICTED_HEADERS.contains(header)) {
                    //HttpClient does not allow some restricted headers
                    continue;
                }
                if (HttpHeaderHelper.CONTENT_TYPE.equalsIgnoreCase(header)) {
                    hasCT = true;
                    continue;
                }
                if (addHeaders || HttpHeaderHelper.COOKIE.equalsIgnoreCase(header)) {
                    headerList.forEach(s -> rb.header(header, s));
                } else {
                    rb.header(header, String.join(",", headerList));
                }
            }
            if (!h.headerMap().containsKey("User-Agent")) {
                rb.header("User-Agent", Headers.USER_AGENT);
            }

            if (hasCT || !KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(outMessage.get(Message.HTTP_REQUEST_METHOD))) {
                boolean dropContentType = false;
                boolean emptyRequest = PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY));

                // If it is an empty request (without a request body) then check further if CT still needs be set
                if (emptyRequest) {
                    final Object setCtForEmptyRequestProp = outMessage
                        .getContextualProperty(Headers.SET_EMPTY_REQUEST_CT_PROPERTY);
                    if (setCtForEmptyRequestProp != null) {
                        // If SET_EMPTY_REQUEST_CT_PROPERTY is set then do as a user prefers.
                        // CT will be dropped if setting CT for empty requests was explicitly disabled
                        dropContentType = PropertyUtils.isFalse(setCtForEmptyRequestProp);
                    }
                }

                if (!dropContentType) {
                    rb.header(HttpHeaderHelper.CONTENT_TYPE, h.determineContentType());
                }
            }
        }

        private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOutputStream out)
            throws IOException {
            if (!connectionComplete) {
                // if we haven't connected yet, we'll see if an exception is the reason
                // why we haven't connected.  Otherwise, wait for the connection
                // to complete.
                if (future.isDone()) {
                    try {
                        future.get();
                    } catch (InterruptedException | ExecutionException e) {
                        if (e.getCause() instanceof IOException) {
                            throw new Fault("Could not send Message.", LOG, (IOException)e.getCause());
                        }
                    }
                    return false;
                }
                try {
                    out.wait(csPolicy.getConnectionTimeout());
                } catch (InterruptedException e) {
                    //ignore
                }
                if (future.isDone()) {
                    try {
                        future.get();
                    } catch (InterruptedException | ExecutionException e) {
                        if (e.getCause() instanceof IOException) {
                            throw new Fault("Could not send Message.", LOG, (IOException)e.getCause());
                        }
                    }
                    return false;
                }
            }
            return true;
        }


        @Override
        protected void setProtocolHeaders() throws IOException {
            HttpClient cl = outMessage.get(HttpClient.class);
            Address address = (Address)outMessage.get(KEY_HTTP_CONNECTION_ADDRESS);
            final HTTPClientPolicy csPolicy = getClient(outMessage);
            String httpRequestMethod =
                (String)outMessage.get(Message.HTTP_REQUEST_METHOD);


            if (KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
                || PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY))) {
                contentLen = 0;
            }

            if (csPolicy.isAllowChunking() || contentLen >= 0) {
                final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
                        ? 4096 : csPolicy.getChunkLength());
                this.publisher = new HttpClientBodyPublisher(this, () -> pin);
                if (contentLen != 0) {
                    pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher);
                }
            } else if (contentLen != 0) {
                // If chunking is not allowed but the contentLen is unknown (-1), we need to
                // buffer the request body stream until it is fully flushed by the client and only
                // than send the request.
                final CloseableByteArrayOutputStream baos = new CloseableByteArrayOutputStream();
                this.publisher = new HttpClientSizedBodyPublisher(this, csPolicy, baos::getInputStream);
                pout = baos;
            }

            HttpRequest.Builder rb = HttpRequest.newBuilder()
                .method(httpRequestMethod, publisher);
            String verc = (String)outMessage.getContextualProperty(FORCE_HTTP_VERSION);
            if (verc == null) {
                verc = csPolicy.getVersion();
            }
            if ("1.1".equals(HTTP_VERSION) || "1.1".equals(verc)) {
                rb.version(Version.HTTP_1_1);
            }
            try {
                rb.uri(address.getURI());
            } catch (IllegalArgumentException iae) {
                MalformedURLException mex = new MalformedURLException(iae.getMessage());
                mex.initCause(iae);
                throw mex;
            }

            rtimeout = determineReceiveTimeout(outMessage, csPolicy);
            if (rtimeout > 0) {
                rb.timeout(Duration.ofMillis(rtimeout));
            }

            setProtocolHeadersInBuilder(rb);

            request = rb.build();


            final BodyHandler<InputStream> handler =  BodyHandlers.ofInputStream();
            if (System.getSecurityManager() != null) {
                try {
                    future = AccessController.doPrivileged(
                            new PrivilegedExceptionAction<CompletableFuture<HttpResponse<InputStream>>>() {
                                @Override
                                public CompletableFuture<HttpResponse<InputStream>> run() throws IOException {
                                    return cl.sendAsync(request, handler);
                                }
                            });
                } catch (PrivilegedActionException e) {
                    throw new RuntimeException(e);
                }
            } else {
                future = cl.sendAsync(request, handler);
            }
            future.exceptionally(ex -> {
                if (pout != null) {
                    synchronized (pout) {
                        pout.notifyAll();
                    }
                }
                try {
                    close();
                } catch (IOException e) {
                    ex.addSuppressed(e);
                }
                return null;
            });
        }
        @Override
        protected void setupWrappedStream() throws IOException {
            if (cachingForRetransmission) {
                cachedStream =
                    new CacheAndWriteOutputStream(pout);
                wrappedStream = cachedStream;
            } else {
                wrappedStream = pout;
            }
            if (exception != null) {
                if (exception instanceof IOException) {
                    throw (IOException)exception;
                } else {
                    throw new IOException(exception);
                }
            }
        }
        @Override
        protected String getExceptionMessage(Throwable t) {
            if (t instanceof ConnectException && t.getMessage() == null) {
                return "Connection refused";
            }
            return t.getMessage();
        }

        HttpResponse<InputStream> getResponse() throws IOException {
            try {
                if (rtimeout > 0) {
                    return future.get(rtimeout, TimeUnit.MILLISECONDS);
                }
                return future.get();
            } catch (ExecutionException e) {
                Throwable t = e.getCause();
                if (t instanceof ConnectException) {
                    Throwable cause = t.getCause();
                    if (cause instanceof UnresolvedAddressException) {
                        UnknownHostException uhe = new UnknownHostException();
                        uhe.initCause(cause);
                        throw uhe;
                    }

                }
                if (t instanceof IOException) {
                    IOException iot = (IOException)t;
                    throw iot;
                }
                throw new IOException(t);
            } catch (InterruptedException e) {
                throw new IOException(e);
            } catch (TimeoutException e) {
                throw (IOException)(new HttpTimeoutException("Timeout").initCause(e));
            }

        }

        @Override
        protected int getResponseCode() throws IOException {
            return getResponse().statusCode();
        }
        @Override
        protected void updateResponseHeaders(Message inMessage) throws IOException {
            Headers h = new Headers(inMessage);
            HttpResponse<InputStream> rsp = getResponse();
            h.readFromConnection(rsp.headers().map());
            if (rsp.headers().map().containsKey(Message.CONTENT_TYPE)) {
                List<String> s = rsp.headers().allValues(Message.CONTENT_TYPE);
                inMessage.put(Message.CONTENT_TYPE, String.join(",", s));
            } else {
                inMessage.put(Message.CONTENT_TYPE, null);
            }
            cookies.readFromHeaders(h);
        }

        @Override
        protected InputStream getInputStream() throws IOException {
            HttpResponse<InputStream> resp = getResponse();
            String method = (String)outMessage.get(Message.HTTP_REQUEST_METHOD);
            int sc = resp.statusCode();
            if ("HEAD".equals(method)) {
                try (InputStream in = resp.body()) {
                    return null;
                }
            }
            if (sc == 204) {
                //no content
                return null;
            }
            if ("OPTIONS".equals(method) || (sc >= 300 && sc < 500)) {
                Optional<String> f = resp.headers().firstValue("content-length");
                Optional<String> fChunk = resp.headers().firstValue("transfer-encoding");
                if (f.isPresent()) {
                    long l = Long.parseLong(f.get());
                    if (l == 0) {
                        try (InputStream in = resp.body()) {
                            return null;
                        }
                    }
                } else if (!fChunk.isPresent() || !"chunked".equals(fChunk.get())) {
                    if (resp.version() == Version.HTTP_2) {
                        InputStream in = resp.body();
                        if (in.available() <= 0) {
                            try (in) {
                                return null;
                            }
                        }
                    } else {
                        try (InputStream in = resp.body()) {
                            return null;
                        }
                    }
                }
            }
            return new HttpClientFilteredInputStream(resp.body());
        }

        @Override
        protected void closeInputStream() throws IOException {
            InputStream is = getInputStream();
            if (is != null) {
                is.close();
            }
        }

        @Override
        protected void handleResponseAsync() throws IOException {
            handleResponseOnWorkqueue(true, false);
        }
        @Override
        public void thresholdReached() throws IOException {
            //not really a way to set the chunk size so not really anything to do
            if (exception != null) {
                if (exception instanceof IOException) {
                    throw (IOException)exception;
                } else {
                    throw new IOException(exception);
                }
            }
        }

        @Override
        protected String getResponseMessage() throws IOException {
            try {
                // HttpClient does not provide access to the actual status message
                // We'll map some of the status codes to match the
                // returns from the HTTPUrlConnection
                HttpResponse<InputStream> in = getResponse();
                switch (in.statusCode()) {
                case 404:
                    return "Not Found";
                case 405:
                    return "Method Not Allowed";
                case 503:
                    return "Service Unavailable";
                case 200:
                    return "OK";
                default:
                    return in.toString();
                }
            } catch (IOException e) {
                //ignore
            }
            return null;
        }

        @Override
        protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
            Address addrss = (Address)outMessage.get(KEY_HTTP_CONNECTION_ADDRESS);
            URI uri = addrss.getURI();

            if ("http".equals(uri.getScheme())) {
                return null;
            }
            String method = (String)outMessage.get(Message.HTTP_REQUEST_METHOD);
            HttpClient cl = outMessage.get(HttpClient.class);

            while (!connectionComplete || !cl.sslContext().getClientSessionContext().getIds().hasMoreElements()) {
                Thread.yield();
            }
            byte[] key = cl.sslContext().getClientSessionContext().getIds().nextElement();
            SSLSession session = cl.sslContext().getClientSessionContext().getSession(key);
            Certificate[] localCerts = session.getLocalCertificates();
            String cipherSuite = session.getCipherSuite();
            Principal principal = session.getLocalPrincipal();
            Certificate[] serverCerts = session.getPeerCertificates();
            Principal peer = session.getPeerPrincipal();

            HttpsURLConnectionInfo info = new HttpsURLConnectionInfo(uri, method, cipherSuite,
                                                                     localCerts, principal,
                                                                     serverCerts, peer);

            return info;
        }



        @Override
        protected boolean usingProxy() {
            HttpClient cl = outMessage.get(HttpClient.class);
            return cl.proxy().isPresent();
        }


        @Override
        protected InputStream getPartialResponse() throws IOException {
            HttpResponse<InputStream> rsp = getResponse();
            int responseCode = rsp.statusCode();
            if (responseCode == HttpURLConnection.HTTP_ACCEPTED
                || responseCode == HttpURLConnection.HTTP_OK) {
                try {
                    PushbackInputStream pbin =
                        new PushbackInputStream(rsp.body());
                    int c = pbin.read();
                    if (c != -1) {
                        pbin.unread((byte)c);
                        return pbin;
                    }
                } catch (IOException ioe) {
                    // ignore
                }
            }
            // Don't need to do anything
            return null;
        }

        @Override
        protected void setupNewConnection(String newURL) throws IOException {
            connectionComplete = false;

            HTTPClientPolicy cp = getClient(outMessage);
            Address address;
            try {
                if (defaultAddress.getString().equals(newURL)) {
                    address = defaultAddress;
                } else {
                    address = new Address(newURL);
                }
            } catch (URISyntaxException e) {
                throw new IOException(e);
            }
            setupConnection(outMessage, address, cp);
            this.url = address.getURI();
        }

        @Override
        protected void retransmitStream() throws IOException {
            cachedStream.writeCacheTo(pout);
            if (pout != null) {
                pout.close();
            }
        }

        @Override
        protected void updateCookiesBeforeRetransmit() throws IOException {
            Headers h = new Headers();
            HttpResponse<InputStream> rsp = getResponse();
            h.readFromConnection(rsp.headers().map());
            cookies.readFromHeaders(h);
        }

    }

}