ChunkedInput.java

/*
 * Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.client;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.ReaderInterceptor;

import org.glassfish.jersey.client.internal.LocalizationMessages;
import org.glassfish.jersey.internal.PropertiesDelegate;
import org.glassfish.jersey.message.MessageBodyWorkers;

/**
 * Response entity type used for receiving messages in "typed" chunks.
 * <p/>
 * This data type is useful for consuming partial responses from large or continuous data
 * input streams.
 *
 * @param <T> chunk type.
 * @author Marek Potociar
 */
@SuppressWarnings("UnusedDeclaration")
public class ChunkedInput<T> extends GenericType<T> implements Closeable {

    private static final Logger LOGGER = Logger.getLogger(ChunkedInput.class.getName());

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private ChunkParser parser = createParser("\r\n");
    private MediaType mediaType;

    private final InputStream inputStream;
    private final Annotation[] annotations;
    private final MultivaluedMap<String, String> headers;
    private final MessageBodyWorkers messageBodyWorkers;
    private final PropertiesDelegate propertiesDelegate;

    /**
     * Create new chunk parser that will split the response entity input stream
     * based on a fixed boundary string.
     *
     * @param boundary chunk boundary.
     * @return new fixed boundary string-based chunk parser.
     */
    public static ChunkParser createParser(final String boundary) {
        return new FixedBoundaryParser(boundary.getBytes());
    }

    /**
     * Create new chunk parser that will split the response entity input stream
     * based on a fixed boundary sequence of bytes.
     *
     * @param boundary chunk boundary.
     * @return new fixed boundary sequence-based chunk parser.
     */
    public static ChunkParser createParser(final byte[] boundary) {
        return new FixedBoundaryParser(boundary);
    }

    /**
     * Create a new chunk multi-parser that will split the response entity input stream
     * based on multiple fixed boundary strings.
     *
     * @param boundaries chunk boundaries.
     * @return new fixed boundary string-based chunk parser.
     */
    public static ChunkParser createMultiParser(final String... boundaries) {
        return new FixedMultiBoundaryParser(boundaries);
    }

    private abstract static class AbstractBoundaryParser implements ChunkParser {

        @Override
        public byte[] readChunk(final InputStream in) throws IOException {
            final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
            byte[] delimiterBuffer = new byte[getDelimiterBufferSize()];

            int data;
            int dPos;
            do {
                dPos = 0;
                while ((data = in.read()) != -1) {
                    final byte b = (byte) data;
                    byte[] delimiter = getDelimiter(b, dPos, delimiterBuffer);

                    // last read byte is part of the chunk delimiter
                    if (delimiter != null && b == delimiter[dPos]) {
                        delimiterBuffer[dPos++] = b;
                        if (dPos == delimiter.length) {
                            // found chunk delimiter
                            break;
                        }
                    } else if (dPos > 0) {
                        delimiter = getDelimiter(dPos - 1, delimiterBuffer);
                        delimiterBuffer[dPos] = b;

                        int matched = matchTail(delimiterBuffer, 1, dPos, delimiter);
                        if (matched == 0) {
                            // flush delimiter buffer
                            buffer.write(delimiterBuffer, 0, dPos);
                            buffer.write(b);
                            dPos = 0;
                        } else if (matched == delimiter.length) {
                            // found chunk delimiter
                            break;
                        } else {
                            // one or more elements of a previous buffered delimiter
                            // are parts of a current buffered delimiter
                            buffer.write(delimiterBuffer, 0, dPos + 1 - matched);
                            dPos = matched;
                        }
                    } else {
                        buffer.write(b);
                    }
                }

            } while (data != -1 && buffer.size() == 0); // skip an empty chunk

            if (dPos > 0 && dPos != getDelimiter(dPos - 1, delimiterBuffer).length) {
                // flush the delimiter buffer, if not empty - parsing finished in the middle of a potential delimiter sequence
                buffer.write(delimiterBuffer, 0, dPos);
            }

            return (buffer.size() > 0) ? buffer.toByteArray() : null;
        }

        /**
         * Selects a delimiter which corresponds to delimiter buffer. Method automatically appends {@code b} param on the
         * {@code pos} position of {@code delimiterBuffer} array and then starts the selection process with a newly created array.
         *
         * @param b               byte which will be added on the {@code pos} position of {@code delimiterBuffer} array
         * @param pos             number of bytes from the delimiter buffer which will be used in processing
         * @param delimiterBuffer current content of the delimiter buffer
         * @return delimiter which corresponds to delimiterBuffer
         */
        abstract byte[] getDelimiter(byte b, int pos, byte[] delimiterBuffer);

        /**
         * Selects a delimiter which corresponds to delimiter buffer.
         *
         * @param pos             position of the last read byte
         * @param delimiterBuffer number of bytes from the delimiter buffer which will be used in processing
         * @return delimiter which corresponds to delimiterBuffer
         */
        abstract byte[] getDelimiter(int pos, byte[] delimiterBuffer);

        /**
         * Returns a delimiter buffer size depending on the selected strategy.
         * <p>
         * If a strategy has multiple registered delimiters, then the delimiter buffer should be a length of the longest
         * delimiter.
         *
         * @return length of the delimiter buffer
         */
        abstract int getDelimiterBufferSize();

        /**
         * Tries to find an element intersection between two arrays in a way that intersecting elements must be
         * at the tail of the first array and at the beginning of the second array.
         * <p>
         * For example, consider the following two arrays:
         * <pre>
         * a1: {a, b, c, d, e}
         * a2: {d, e, f, g}
         * </pre>
         * In this example, the intersection of tail of {@code a1} with head of {@code a2} is <tt>{d, e}</tt>
         * and consists of 2 overlapping elements.
         * </p>
         * The method takes the first array represented as a sub-array in buffer demarcated by an offset and length.
         * The second array is a fixed pattern to be matched. The method then compares the tail of the
         * array in the buffer with the head of the pattern and returns the number of intersecting elements,
         * or zero in case the two arrays do not intersect tail to head.
         *
         * @param buffer  byte buffer containing the array whose tail to intersect.
         * @param offset  start of the array to be tail-matched in the {@code buffer}.
         * @param length  length of the array to be tail-matched.
         * @param pattern pattern to be head-matched.
         * @return {@code 0} if any part of the tail of the array in the buffer does not match
         * any part of the head of the pattern, otherwise returns number of overlapping elements.
         */
        private static int matchTail(byte[] buffer, int offset, int length, byte[] pattern) {
            if (pattern == null) {
                return 0;
            }

            outer:
            for (int i = 0; i < length; i++) {
                final int tailLength = length - i;
                for (int j = 0; j < tailLength; j++) {
                    if (buffer[offset + i + j] != pattern[j]) {
                        // mismatch - continue with shorter tail
                        continue outer;
                    }
                }

                // found the longest matching tail
                return tailLength;
            }
            return 0;
        }
    }

    private static class FixedBoundaryParser extends AbstractBoundaryParser {

        private final byte[] delimiter;

        public FixedBoundaryParser(final byte[] boundary) {
            delimiter = Arrays.copyOf(boundary, boundary.length);
        }

        @Override
        byte[] getDelimiter(byte b, int pos, byte[] delimiterBuffer) {
            return delimiter;
        }

        @Override
        byte[] getDelimiter(int pos, byte[] delimiterBuffer) {
            return delimiter;
        }

        @Override
        int getDelimiterBufferSize() {
            return delimiter.length;
        }
    }

    private static class FixedMultiBoundaryParser extends AbstractBoundaryParser {

        private final List<byte[]> delimiters = new ArrayList<byte[]>();

        private final int longestDelimiterLength;

        public FixedMultiBoundaryParser(String... boundaries) {
            for (String boundary: boundaries) {
                byte[] boundaryBytes = boundary.getBytes();
                delimiters.add(Arrays.copyOf(boundaryBytes, boundaryBytes.length));
            }

            Collections.sort(delimiters, new Comparator<byte[]>() {
                @Override
                public int compare(byte[] o1, byte[] o2) {
                    return Integer.compare(o1.length, o2.length);
                }
            });

            byte[] longestDelimiter = delimiters.get(delimiters.size() - 1);
            this.longestDelimiterLength = longestDelimiter.length;
        }

        @Override
        byte[] getDelimiter(byte b, int pos, byte[] delimiterBuffer) {
            byte[] buffer = Arrays.copyOf(delimiterBuffer, delimiterBuffer.length);
            buffer[pos] = b;

            return getDelimiter(pos, buffer);
        }

        @Override
        byte[] getDelimiter(int pos, byte[] delimiterBuffer) {
            outer:
            for (byte[] delimiter: delimiters) {
                if (pos > delimiter.length) {
                    continue;
                }

                for (int i = 0; i <= pos && i < delimiter.length; i++) {
                    if (delimiter[i] != delimiterBuffer[i]) {
                        continue outer;
                    } else if (pos == i) {
                        return delimiter;
                    }
                }
            }

            return null;
        }

        @Override
        int getDelimiterBufferSize() {
            return this.longestDelimiterLength;
        }
    }

    /**
     * Package-private constructor used by the {@link ChunkedInputReader}.
     *
     * @param chunkType          chunk type.
     * @param inputStream        response input stream.
     * @param annotations        annotations associated with response entity.
     * @param mediaType          response entity media type.
     * @param headers            response headers.
     * @param messageBodyWorkers message body workers.
     * @param propertiesDelegate properties delegate for this request/response.
     */
    protected ChunkedInput(
            final Type chunkType,
            final InputStream inputStream,
            final Annotation[] annotations,
            final MediaType mediaType,
            final MultivaluedMap<String, String> headers,
            final MessageBodyWorkers messageBodyWorkers,
            final PropertiesDelegate propertiesDelegate) {
        super(chunkType);

        this.inputStream = inputStream;
        this.annotations = annotations;
        this.mediaType = mediaType;
        this.headers = headers;
        this.messageBodyWorkers = messageBodyWorkers;
        this.propertiesDelegate = propertiesDelegate;
    }

    /**
     * Get the underlying chunk parser.
     * <p>
     * Note: Access to internal chunk parser is not a thread-safe operation and has to be explicitly synchronized
     * in case the chunked input is used from multiple threads.
     * </p>
     *
     * @return underlying chunk parser.
     */
    public ChunkParser getParser() {
        return parser;
    }

    /**
     * Set new chunk parser.
     * <p>
     * Note: Access to internal chunk parser is not a thread-safe operation and has to be explicitly synchronized
     * in case the chunked input is used from multiple threads.
     * </p>
     *
     * @param parser new chunk parser.
     */
    public void setParser(final ChunkParser parser) {
        this.parser = parser;
    }

    /**
     * Get chunk data media type.
     * <p/>
     * Default chunk data media type is derived from the value of the response
     * <tt>{@value javax.ws.rs.core.HttpHeaders#CONTENT_TYPE}</tt> header field.
     * This default value may be manually overridden by {@link #setChunkType(javax.ws.rs.core.MediaType) setting}
     * a custom non-{@code null} chunk media type value.
     * <p>
     * Note: Access to internal chunk media type is not a thread-safe operation and has to
     * be explicitly synchronized in case the chunked input is used from multiple threads.
     * </p>
     *
     * @return media type specific to each chunk of data.
     */
    public MediaType getChunkType() {
        return mediaType;
    }

    /**
     * Set custom chunk data media type.
     * <p/>
     * By default, chunk data media type is derived from the value of the response
     * <tt>{@value javax.ws.rs.core.HttpHeaders#CONTENT_TYPE}</tt> header field.
     * Using this methods will override the default chunk media type value and set it
     * to a custom non-{@code null} chunk media type. Once this method is invoked,
     * all subsequent {@link #read chunk reads} will use the newly set chunk media
     * type when selecting the proper {@link javax.ws.rs.ext.MessageBodyReader} for
     * chunk de-serialization.
     * <p>
     * Note: Access to internal chunk media type is not a thread-safe operation and has to
     * be explicitly synchronized in case the chunked input is used from multiple threads.
     * </p>
     *
     * @param mediaType custom chunk data media type. Must not be {@code null}.
     * @throws IllegalArgumentException in case the {@code mediaType} is {@code null}.
     */
    public void setChunkType(final MediaType mediaType) throws IllegalArgumentException {
        if (mediaType == null) {
            throw new IllegalArgumentException(LocalizationMessages.CHUNKED_INPUT_MEDIA_TYPE_NULL());
        }
        this.mediaType = mediaType;
    }

    /**
     * Set custom chunk data media type from a string value.
     * <p>
     * Note: Access to internal chunk media type is not a thread-safe operation and has to
     * be explicitly synchronized in case the chunked input is used from multiple threads.
     * </p>
     *
     * @param mediaType custom chunk data media type. Must not be {@code null}.
     * @throws IllegalArgumentException in case the {@code mediaType} cannot be parsed into
     *                                  a valid {@link MediaType} instance or is {@code null}.
     * @see #setChunkType(javax.ws.rs.core.MediaType)
     */
    public void setChunkType(final String mediaType) throws IllegalArgumentException {
        this.mediaType = MediaType.valueOf(mediaType);
    }

    @Override
    public void close() {
        if (closed.compareAndSet(false, true)) {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (final IOException e) {
                    LOGGER.log(Level.FINE, LocalizationMessages.CHUNKED_INPUT_STREAM_CLOSING_ERROR(), e);
                }
            }
        }
    }

    /**
     * Check if the chunked input has been closed.
     *
     * @return {@code true} if this chunked input has been closed, {@code false} otherwise.
     */
    public boolean isClosed() {
        return closed.get();
    }

    /**
     * Read next chunk from the response stream and convert it to a Java instance
     * using the {@link #getChunkType() chunk media type}. The method returns {@code null}
     * if the underlying entity input stream has been closed (either implicitly or explicitly
     * by calling the {@link #close()} method).
     * <p>
     * Note: Access to internal chunk parser is not a thread-safe operation and has to be explicitly
     * synchronized in case the chunked input is used from multiple threads.
     * </p>
     *
     * @return next streamed chunk or {@code null} if the underlying entity input stream
     * has been closed while reading next chunk data.
     * @throws IllegalStateException in case this chunked input has been closed.
     */
    @SuppressWarnings("unchecked")
    public T read() throws IllegalStateException {
        if (closed.get()) {
            throw new IllegalStateException(LocalizationMessages.CHUNKED_INPUT_CLOSED());
        }

        try {
            final byte[] chunk = parser.readChunk(inputStream);
            if (chunk == null) {
                close();
            } else {
                final ByteArrayInputStream chunkStream = new ByteArrayInputStream(chunk);
                // TODO: add interceptors: interceptors are used in ChunkedOutput, so the stream should
                // be intercepted in the ChunkedInput too. Interceptors cannot be easily added to the readFrom
                // method as they should wrap the stream before it is processed by ChunkParser. Also please check todo
                // in ChunkedInput (this should be fixed together with this todo)
                // issue: JERSEY-1809
                return (T) messageBodyWorkers.readFrom(
                        getRawType(),
                        getType(),
                        annotations,
                        mediaType,
                        headers,
                        propertiesDelegate,
                        chunkStream,
                        Collections.<ReaderInterceptor>emptyList(),
                        false);
            }
        } catch (final IOException e) {
            Logger.getLogger(this.getClass().getName()).log(Level.FINE, e.getMessage(), e);
            close();
        }
        return null;
    }
}