NonBlockingInputStreamImpl.java

/*
 * Copyright (c) 2002-2018, the original author(s).
 *
 * This software is distributable under the BSD license. See the terms of the
 * BSD license in the documentation provided with this software.
 *
 * https://opensource.org/licenses/BSD-3-Clause
 */
package org.jline.utils;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;

/**
 * This class wraps a regular input stream and allows it to appear as if it
 * is non-blocking; that is, reads can be performed against it that timeout
 * if no data is seen for a period of time.  This effect is achieved by having
 * a separate thread perform all non-blocking read requests and then
 * waiting on the thread to complete.
 *
 * <p>VERY IMPORTANT NOTES
 * <ul>
 *   <li> This class is not thread safe. It expects at most one reader.
 *   <li> The {@link #shutdown()} method must be called in order to shut down
 *          the thread that handles blocking I/O.
 * </ul>
 */
public class NonBlockingInputStreamImpl extends NonBlockingInputStream {
    private InputStream in; // The actual input stream
    private int b = READ_EXPIRED; // Recently read byte

    private String name;
    private boolean threadIsReading = false;
    private IOException exception = null;
    private long threadDelay = 60 * 1000;
    private Thread thread;

    /**
     * Creates a <code>NonBlockingReader</code> out of a normal blocking
     * reader. Note that this call also spawn a separate thread to perform the
     * blocking I/O on behalf of the thread that is using this class. The
     * {@link #shutdown()} method must be called in order to shut this thread down.
     * @param name The stream name
     * @param in The reader to wrap
     */
    public NonBlockingInputStreamImpl(String name, InputStream in) {
        this.in = in;
        this.name = name;
    }

    private synchronized void startReadingThreadIfNeeded() {
        if (thread == null) {
            thread = new Thread(this::run);
            thread.setName(name + " non blocking reader thread");
            thread.setDaemon(true);
            thread.start();
        }
    }

    /**
     * Shuts down the thread that is handling blocking I/O. Note that if the
     * thread is currently blocked waiting for I/O it will not actually
     * shut down until the I/O is received.
     */
    public synchronized void shutdown() {
        if (thread != null) {
            notify();
        }
    }

    @Override
    public void close() throws IOException {
        /*
         * The underlying input stream is closed first. This means that if the
         * I/O thread was blocked waiting on input, it will be woken for us.
         */
        in.close();
        shutdown();
    }

    /**
     * Attempts to read a byte from the input stream for a specific
     * period of time.
     * @param timeout The amount of time to wait for the character
     * @param isPeek <code>true</code>if the byte read must not be consumed
     * @return The byte read, -1 if EOF is reached, or -2 if the
     *   read timed out.
     * @throws IOException if anything wrong happens
     */
    public synchronized int read(long timeout, boolean isPeek) throws IOException {
        /*
         * If the thread hit an IOException, we report it.
         */
        if (exception != null) {
            assert b == READ_EXPIRED;
            IOException toBeThrown = exception;
            if (!isPeek) exception = null;
            throw toBeThrown;
        }

        /*
         * If there was a pending character from the thread, then
         * we send it. If the timeout is 0L or the thread was shut down
         * then do a local read.
         */
        if (b >= -1) {
            assert exception == null;
        } else if (!isPeek && timeout <= 0L && !threadIsReading) {
            b = in.read();
        } else {
            /*
             * If the thread isn't reading already, then ask it to do so.
             */
            if (!threadIsReading) {
                threadIsReading = true;
                startReadingThreadIfNeeded();
                notifyAll();
            }

            /*
             * So the thread is currently doing the reading for us. So
             * now we play the waiting game.
             */
            Timeout t = new Timeout(timeout);
            while (!t.elapsed()) {
                try {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    wait(t.timeout());
                } catch (InterruptedException e) {
                    exception = (IOException) new InterruptedIOException().initCause(e);
                }

                if (exception != null) {
                    assert b == READ_EXPIRED;

                    IOException toBeThrown = exception;
                    if (!isPeek) exception = null;
                    throw toBeThrown;
                }

                if (b >= -1) {
                    assert exception == null;
                    break;
                }
            }
        }

        /*
         * b is the character that was just read. Either we set it because
         * a local read was performed or the read thread set it (or failed to
         * change it).  We will return it's value, but if this was a peek
         * operation, then we leave it in place.
         */
        int ret = b;
        if (!isPeek) {
            b = READ_EXPIRED;
        }
        return ret;
    }

    private void run() {
        Log.debug("NonBlockingInputStream start");
        boolean needToRead;

        try {
            while (true) {

                /*
                 * Synchronize to grab variables accessed by both this thread
                 * and the accessing thread.
                 */
                synchronized (this) {
                    needToRead = this.threadIsReading;

                    try {
                        /*
                         * Nothing to do? Then wait.
                         */
                        if (!needToRead) {
                            wait(threadDelay);
                        }
                    } catch (InterruptedException e) {
                        /* IGNORED */
                    }

                    needToRead = this.threadIsReading;
                    if (!needToRead) {
                        return;
                    }
                }

                /*
                 * We're not shutting down, but we need to read. This cannot
                 * happen while we are holding the lock (which we aren't now).
                 */
                int byteRead = READ_EXPIRED;
                IOException failure = null;
                try {
                    byteRead = in.read();
                } catch (IOException e) {
                    failure = e;
                }

                /*
                 * Re-grab the lock to update the state.
                 */
                synchronized (this) {
                    exception = failure;
                    b = byteRead;
                    threadIsReading = false;
                    notify();
                }

                // If end of stream, exit the loop thread
                if (byteRead < 0) {
                    return;
                }
            }
        } catch (Throwable t) {
            Log.warn("Error in NonBlockingInputStream thread", t);
        } finally {
            Log.debug("NonBlockingInputStream shutdown");
            synchronized (this) {
                thread = null;
                threadIsReading = false;
            }
        }
    }
}