ChunkedBodyOutputStream.java

/*
 * Copyright (c) 2015, 2019 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.jdk.connector.internal;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.Buffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;


/**
 * Body stream that can operate either synchronously or asynchronously. See {@link BodyOutputStream} for details.
 *
 * @author Petr Janouch
 */
class ChunkedBodyOutputStream extends BodyOutputStream {

    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

    private final int chunkSize;
    private final int encodedFullChunkSize;

    // this stream is buffering by default; it has pending data up to dataBuffer.capacity()
    private final ByteBuffer dataBuffer;

    // in sync. mode, the write operations will block until the stream is opened for data
    private final CountDownLatch initialBlockingLatch = new CountDownLatch(1);

    private volatile Filter<ByteBuffer, ?, ?, ?> downstreamFilter;
    private volatile WriteListener writeListener = null;
    // an internal listener, so the connector can be notified when the stream has been closed (=body has been sent)
    private volatile Listener closeListener;
    // mode this stream operates in
    private volatile Mode mode = Mode.UNDECIDED;
    private volatile boolean ready = false;
    // flag to make sure that a listener is called only for the first time or after isReady() returned false
    private volatile boolean callListener = true;

    private volatile boolean closed = false;

    ChunkedBodyOutputStream(int chunkSize) {
        this.chunkSize = chunkSize;
        this.dataBuffer = ByteBuffer.allocate(chunkSize);
        this.encodedFullChunkSize = HttpRequestEncoder.getChunkSize(chunkSize);
    }

    @Override
    public synchronized void setWriteListener(WriteListener writeListener) {
        if (this.writeListener != null) {
            throw new IllegalStateException(LocalizationMessages.WRITE_LISTENER_SET_ONLY_ONCE());
        }

        assertAsynchronousOperation();
        this.writeListener = writeListener;
        commitToMode();

        if (ready && callListener) {
            callOnWritePossible();
        }
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        commitToMode();

        // input validation borrowed from OutputStream
        if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0)
                || ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }

        assertValidState();
        doInitialBlocking();

        if (len < dataBuffer.remaining()) {
            // if the data fit into the buffer, use write per byte
            for (int i = off; i < off + len; i++) {
                write(b[i]);
            }
        } else {
            // if the data overflow the buffer, send a multiple of the buffer size and buffer the remainder
            int currentDataLength = ((Buffer) dataBuffer).position() + len;
            int remainder = currentDataLength % dataBuffer.capacity();
            // buffer that will be send
            ByteBuffer buffer = ByteBuffer.allocate(currentDataLength - remainder);
            ((Buffer) dataBuffer).flip();
            // put currently buffered data
            buffer.put(dataBuffer);
            // fill the rest with passed data
            buffer.put(b, off, len - remainder);
            ((Buffer) buffer).flip();
            ((Buffer) dataBuffer).clear();
            // buffer remaining data
            dataBuffer.put(b, off + len - remainder, remainder);
            // send the to-be-written buffer
            write(buffer);
        }
    }

    @Override
    public void flush() throws IOException {
        super.flush();
        if (mode == Mode.UNDECIDED) {
            // if we are not committed to any mode, any of the write operations has not been invoked yet
            return;
        }

        if (mode == Mode.ASYNCHRONOUS) {
            assertValidState();
        }

        if (((Buffer) dataBuffer).position() == 0) {
            // there is nothing buffered, so don't bother
            return;
        }

        ((Buffer) dataBuffer).flip();
        write(dataBuffer);
    }

    @Override
    public void write(int b) throws IOException {
        commitToMode();
        assertValidState();
        doInitialBlocking();

        dataBuffer.put((byte) b);
        if (!dataBuffer.hasRemaining()) {
            // send the buffer if we have just filled it.
            ((Buffer) dataBuffer).flip();
            write(dataBuffer);
        }
    }

    @Override
    public boolean isReady() {
        // TODO we might support this in synchronous mode too
        assertAsynchronousOperation();

        if (!ready) {
            callListener = true;
        }

        return ready;
    }

    private void assertValidState() {
        if (closed) {
            throw new IllegalStateException(LocalizationMessages.STREAM_CLOSED());
        }

        if (mode == Mode.ASYNCHRONOUS && !ready) {
            // we are in asynchronous mode, but the user called write when the stream in non-ready state
            throw new IllegalStateException(LocalizationMessages.WRITE_WHEN_NOT_READY());
        }
    }

    protected void write(final ByteBuffer byteBuffer) throws IOException {
        // do transport encoding on the raw data
        ByteBuffer httpChunk = encodeToHttp(byteBuffer);

        if (mode == Mode.SYNCHRONOUS) {
            final CountDownLatch writeLatch = new CountDownLatch(1);
            final AtomicReference<Throwable> error = new AtomicReference<>();
            downstreamFilter.write(httpChunk, new CompletionHandler<ByteBuffer>() {
                @Override
                public void completed(ByteBuffer result) {
                    writeLatch.countDown();
                }

                @Override
                public void failed(Throwable t) {
                    error.set(t);
                    writeLatch.countDown();
                }
            });

            try {
                // block until the operation has completed
                writeLatch.await();
            } catch (InterruptedException e) {
                throw new IOException(LocalizationMessages.WRITING_FAILED(), e);
            }

            ((Buffer) byteBuffer).clear();

            Throwable t = error.get();
            // check fo any errors
            if (t != null) {
                throw new IOException(LocalizationMessages.WRITING_FAILED(), t);
            }
        } else {
            ready = false;
            downstreamFilter.write(httpChunk, new CompletionHandler<ByteBuffer>() {

                @Override
                public void completed(ByteBuffer result) {
                    ready = true;
                    ((Buffer) byteBuffer).clear();
                    if (callListener) {
                        callOnWritePossible();
                    }
                }

                @Override
                public void failed(Throwable throwable) {
                    ready = false;
                    writeListener.onError(throwable);
                }
            });
        }
    }

    synchronized void open(Filter<ByteBuffer, ?, ?, ?> downstreamFilter) {
        this.downstreamFilter = downstreamFilter;
        initialBlockingLatch.countDown();
        ready = true;

        if (mode == Mode.ASYNCHRONOUS && writeListener != null) {
            callOnWritePossible();
        }
    }

    protected void doInitialBlocking() throws IOException {
        if (mode != Mode.SYNCHRONOUS || downstreamFilter != null) {
            return;
        }

        try {
            initialBlockingLatch.await();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    protected synchronized void commitToMode() {
        // return if the mode has already been committed
        if (mode != Mode.UNDECIDED) {
            return;
        }

        // go asynchronous, if the user has made any move suggesting asynchronous mode
        if (writeListener != null) {
            mode = Mode.ASYNCHRONOUS;
            return;
        }

        // go synchronous, if the user has not made any suggesting asynchronous mode
        mode = Mode.SYNCHRONOUS;
    }

    private void assertAsynchronousOperation() {
        if (mode == Mode.SYNCHRONOUS) {
            throw new UnsupportedOperationException(LocalizationMessages.ASYNC_OPERATION_NOT_SUPPORTED());
        }
    }

    private void callOnWritePossible() {
        callListener = false;
        try {
            writeListener.onWritePossible();
        } catch (IOException e) {
            writeListener.onError(e);
        }
    }

    /**
     * Set a close listener which will be called when the user closes the stream.
     * <p/>
     * This is used to indicate that the body has been completely written.
     *
     * @param closeListener close listener.
     */
    synchronized void setCloseListener(Listener closeListener) {
        this.closeListener = closeListener;
    }

    /**
     * Transform raw application data into HTTP body.
     *
     * @param byteBuffer application data.
     * @return http body part.
     */
    protected ByteBuffer encodeToHttp(ByteBuffer byteBuffer) {
        // we expect the size of the buffer to be either a multiple of chunkSize
        // or smaller than chunkSize in case of the last content-carrying chunk and closing chunk (the one sent by close())
        if (byteBuffer.remaining() < chunkSize) {
            return HttpRequestEncoder.encodeChunk(byteBuffer);
        }

        if (byteBuffer.remaining() % chunkSize != 0) {
            // the buffer is neither a multiple of chunkSize nor smaller than chunkSize
            throw new IllegalStateException(LocalizationMessages.BUFFER_INCORRECT_LENGTH());
        }

        int numberOfChunks = byteBuffer.remaining() / chunkSize;
        ByteBuffer encodedChunks = ByteBuffer.allocate(numberOfChunks * encodedFullChunkSize);

        for (int i = 0; i < numberOfChunks; i++) {
            ((Buffer) byteBuffer).position(i * chunkSize);
            ((Buffer) byteBuffer).limit(i * chunkSize + chunkSize);
            ByteBuffer encodeChunk = HttpRequestEncoder.encodeChunk(byteBuffer);
            encodedChunks.put(encodeChunk);
        }

        ((Buffer) encodedChunks).flip();
        return encodedChunks;
    }

    @Override
    public void close() throws IOException {
        if (closed) {
            return;
        }

            commitToMode();
            // just in case close is invoked without any data being written
            doInitialBlocking();
            flush();
            // chunk-encoded message is finished with an empty chunk
            write(EMPTY_BUFFER);
            super.close();

        closed = true;
        synchronized (this) {
            if (closeListener != null) {
                closeListener.onClosed();
            }
        }
    }

    /**
     * Set a close listener which will be called when the user closes the stream.
     * <p/>
     * This is used to indicate that the body has been completely written.
     */
    interface Listener {

        void onClosed();
    }

    private enum Mode {
        SYNCHRONOUS,
        ASYNCHRONOUS,
        UNDECIDED
    }
}