NettyInputStream.java

/*
 * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.netty.connector.internal;

import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;

import io.netty.buffer.ByteBuf;

/**
 * Input stream which servers as Request entity input.
 * <p>
 * Converts Netty NIO buffers to an input streams and stores them in the queue,
 * waiting for Jersey to process it.
 *
 * @author Pavel Bucek
 */
public class NettyInputStream extends InputStream {

    private volatile boolean end = false;
    private Throwable cause;

    private final ArrayDeque<ByteBuf> isList;
    private ByteBuf current;
    private ByteBuffer buffer;

    private byte[] ONE_BYTE;
    private boolean reading;

    public NettyInputStream() {
        this.isList = new ArrayDeque<>();
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
       if (current == null) {
          buffer = awaitNext();
          if (buffer == null) {
             // assert: end is true
             if (cause == null) {
                return -1;
             }

             throw new IOException(cause);
          }
       }

       int rem = buffer.remaining();
       if (rem < len) {
          len = rem;
       }
       buffer.get(b, off, len);
       if (rem == len) {
          releaseByteBuf();
       }

       return len;
    }

    @Override
    public int read() throws IOException {
       if (ONE_BYTE == null) {
          ONE_BYTE = new byte[1];
       }
       int r = read(ONE_BYTE, 0, 1);
       if (r < 0) {
          return r;
       }

       return ONE_BYTE[0] & 0xff;
    }

    @Override
    public void close() {

        releaseByteBuf();

        cleanup(true);
    }

    private void releaseByteBuf() {
        if (current != null) {
            current.release();
        }

        current = null;
        buffer = null;
    }

    protected synchronized ByteBuffer awaitNext() {
       while (isList.isEmpty()) {
          if (end) {
             return null;
          }

          try {
             reading = true;
             wait();
             reading = false;
          } catch (InterruptedException ie) {
             // waiting uninterruptibly
          }
       }

       current = isList.poll();
       return current.nioBuffer().asReadOnlyBuffer();
    }

    public void complete(Throwable cause) {
       this.cause = cause;
       cleanup(cause != null);
    }

    protected synchronized void cleanup(boolean drain) {
       if (drain) {
          while (!isList.isEmpty()) {
             isList.poll().release();
          }
       }

       end = true;

       if (reading) {
          notifyAll();
       }
    }

    @Override
    public int available() throws IOException {
        return buffer == null ? 0 : buffer.remaining();
    }

    public synchronized void publish(ByteBuf content) {
       if (end || content.nioBuffer().remaining() == 0) {
          content.release();
          return;
       }

       isList.add(content);
       if (reading) {
          notifyAll();
       }
    }

    public void clear() {
        end = false;
        reading = false;

        cause = null;
        buffer = null;
        current = null;

        isList.clear();
    }
}