QueueInputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.commons.io.input;

import static org.apache.commons.io.IOUtils.EOF;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.build.AbstractStreamBuilder;
import org.apache.commons.io.output.QueueOutputStream;

/**
 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
 * <p>
 * To build an instance, use {@link Builder}.
 * </p>
 * <p>
 * Example usage:
 * </p>
 * <pre>
 * QueueInputStream inputStream = new QueueInputStream();
 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
 *
 * outputStream.write("hello world".getBytes(UTF_8));
 * inputStream.read();
 * </pre>
 * <p>
 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
 * </p>
 * <p>
 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
 * {@link IOException}.
 * </p>
 *
 * @see Builder
 * @see QueueOutputStream
 * @since 2.9.0
 */
public class QueueInputStream extends InputStream {

    // @formatter:off
    /**
     * Builds a new {@link QueueInputStream}.
     *
     * <p>
     * For example:
     * </p>
     * <pre>{@code
     * QueueInputStream s = QueueInputStream.builder()
     *   .setBlockingQueue(new LinkedBlockingQueue<>())
     *   .setTimeout(Duration.ZERO)
     *   .get();}
     * </pre>
     *
     * @see #get()
     * @since 2.12.0
     */
    // @formatter:on
    public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {

        private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        private Duration timeout = Duration.ZERO;

        /**
         * Constructs a new builder of {@link QueueInputStream}.
         */
        public Builder() {
            // empty
        }

        /**
         * Builds a new {@link QueueInputStream}.
         * <p>
         * This builder uses the following aspects:
         * </p>
         * <ul>
         * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
         * <li>timeout</li>
         * </ul>
         *
         * @return a new instance.
         * @see #getUnchecked()
         */
        @Override
        public QueueInputStream get() {
            return new QueueInputStream(this);
        }

        /**
         * Sets backing queue for the stream.
         *
         * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
         * @return {@code this} instance.
         */
        public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
            return this;
        }

        /**
         * Sets the polling timeout.
         *
         * @param timeout the polling timeout.
         * @return {@code this} instance.
         */
        public Builder setTimeout(final Duration timeout) {
            if (timeout != null && timeout.toNanos() < 0) {
                throw new IllegalArgumentException("timeout must not be negative");
            }
            this.timeout = timeout != null ? timeout : Duration.ZERO;
            return this;
        }

    }

    /**
     * Constructs a new {@link Builder}.
     *
     * @return a new {@link Builder}.
     * @since 2.12.0
     */
    public static Builder builder() {
        return new Builder();
    }

    private final BlockingQueue<Integer> blockingQueue;

    private final long timeoutNanos;

    /**
     * Constructs a new instance with no limit to its internal queue size and zero timeout.
     */
    public QueueInputStream() {
        this(new LinkedBlockingQueue<>());
    }

    /**
     * Constructs a new instance with given queue and zero timeout.
     *
     * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
     */
    @Deprecated
    public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
        this(builder().setBlockingQueue(blockingQueue));
    }

    /**
     * Constructs a new instance.
     *
     * @param builder The builder.
     */
    private QueueInputStream(final Builder builder) {
        this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
        this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
    }

    /**
     * Gets the blocking queue.
     *
     * @return the blocking queue.
     */
    BlockingQueue<Integer> getBlockingQueue() {
        return blockingQueue;
    }

    /**
     * Gets the timeout duration.
     *
     * @return the timeout duration.
     */
    Duration getTimeout() {
        return Duration.ofNanos(timeoutNanos);
    }

    /**
     * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
     *
     * @return QueueOutputStream connected to this stream.
     */
    public QueueOutputStream newQueueOutputStream() {
        return new QueueOutputStream(blockingQueue);
    }

    /**
     * Reads and returns a single byte.
     *
     * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
     * @throws IllegalStateException if thread is interrupted while waiting.
     */
    @Override
    public int read() {
        try {
            final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
            return value == null ? EOF : 0xFF & value;
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            // throw runtime unchecked exception to maintain signature backward-compatibility of
            // this read method, which does not declare IOException
            throw new IllegalStateException(e);
        }
    }

    /**
     * Reads up to {@code length} bytes of data from the input stream into
     * an array of bytes.  The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
     * the timeout. The number of bytes actually read is returned as an integer.
     *
     * @param b     the buffer into which the data is read.
     * @param offset   the start offset in array {@code b} at which the data is written.
     * @param length   the maximum number of bytes to read.
     * @return     the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
     *              end of the stream has been reached.
     * @throws NullPointerException If {@code b} is {@code null}.
     * @throws IllegalStateException if thread is interrupted while waiting for the first byte.
     * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
     *             greater than {@code b.length - offset}.
     * @since 2.20.0
     */
    @Override
    public int read(final byte[] b, final int offset, final int length) {
        if (b == null) {
            throw new NullPointerException();
        }
        if (offset < 0 || length < 0 || length > b.length - offset) {
            throw new IndexOutOfBoundsException(
                    String.format("Range [%d, %<d + %d) out of bounds for length %d", offset, length, b.length));
        }
        if (length == 0) {
            return 0;
        }
        final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
        blockingQueue.drainTo(drain, length);
        if (drain.isEmpty()) {
            // no data immediately available. wait for first byte
            final int value = read();
            if (value == EOF) {
                return EOF;
            }
            drain.add(value);
            blockingQueue.drainTo(drain, length - 1);
        }
        int i = 0;
        for (final Integer value : drain) {
            b[offset + i] = (byte) (0xFF & value);
            i++;
        }
        return i;
    }

}