SnappyFramedOutputStream.java

/*
 * Created: Apr 12, 2013
 */
package org.xerial.snappy;

import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.zip.Checksum;

import org.xerial.snappy.pool.BufferPool;
import org.xerial.snappy.pool.DefaultPoolFactory;

/**
 * Implements the <a
 * href="https://github.com/google/snappy/blob/main/framing_format.txt"
 * >x-snappy-framed</a> as an {@link OutputStream} and
 * {@link WritableByteChannel}.
 *
 * @author Brett Okken
 * @since 1.1.0
 */
public final class SnappyFramedOutputStream
        extends OutputStream
        implements
        WritableByteChannel
{

    /**
     * The x-snappy-framed specification allows for a chunk size up to
     * 16,777,211 bytes in length. However, it also goes on to state:
     * <p>
     * <code>
     * We place an additional restriction that the uncompressed data in a chunk
     * must be no longer than 65536 bytes. This allows consumers to easily use
     * small fixed-size buffers.
     * </code>
     * </p>
     */
    public static final int MAX_BLOCK_SIZE = 64 * 1024;

    /**
     * The default block size to use.
     */
    public static final int DEFAULT_BLOCK_SIZE = MAX_BLOCK_SIZE;

    /**
     * The default min compression ratio to use.
     */
    public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d;

    private final Checksum crc32 = SnappyFramed.getCRC32C();
    private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order(
            ByteOrder.LITTLE_ENDIAN);
    private final BufferPool bufferPool;
    private final int blockSize;
    private final ByteBuffer buffer;
    private final ByteBuffer directInputBuffer;
    private final ByteBuffer outputBuffer;
    private final double minCompressionRatio;

    private final WritableByteChannel out;

    // private int position;
    private boolean closed;

    /**
     * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
     * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
     * <p>
     * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
     * </p>
     *
     * @param out The underlying {@link OutputStream} to write to. Must not be
     * {@code null}.
     * @throws IOException
     */
    public SnappyFramedOutputStream(OutputStream out)
            throws IOException
    {
        this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
    }

    /**
     * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
     * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
     *
     * @param out The underlying {@link OutputStream} to write to. Must not be
     * {@code null}.
     * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. 
     * @throws IOException
     */
    public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool)
            throws IOException
    {
        this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
    }

    /**
     * Creates a new {@link SnappyFramedOutputStream} instance.
     * <p>
     * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
     * </p>
     *
     * @param out The underlying {@link OutputStream} to write to. Must not be
     * {@code null}.
     * @param blockSize The block size (of raw data) to compress before writing frames
     * to <i>out</i>. Must be in (0, 65536].
     * @param minCompressionRatio Defines the minimum compression ratio (
     * {@code compressedLength / rawLength}) that must be achieved to
     * write the compressed data. This must be in (0, 1.0].
     * @throws IOException
     */
    public SnappyFramedOutputStream(OutputStream out, int blockSize,
            double minCompressionRatio)
            throws IOException
    {
        this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
    }

    /**
     * Creates a new {@link SnappyFramedOutputStream} instance.
     *
     * @param out The underlying {@link OutputStream} to write to. Must not be
     * {@code null}.
     * @param blockSize The block size (of raw data) to compress before writing frames
     * to <i>out</i>. Must be in (0, 65536].
     * @param minCompressionRatio Defines the minimum compression ratio (
     * {@code compressedLength / rawLength}) that must be achieved to
     * write the compressed data. This must be in (0, 1.0].
     * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. 
     * @throws IOException
     */
    public SnappyFramedOutputStream(OutputStream out, int blockSize,
            double minCompressionRatio, BufferPool bufferPool)
            throws IOException
    {
        this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool);
    }

    /**
     * Creates a new {@link SnappyFramedOutputStream} using the
     * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
     * <p>
     * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
     * </p>
     *
     * @param out The underlying {@link WritableByteChannel} to write to. Must
     * not be {@code null}.
     * @throws IOException
     * @since 1.1.1
     */
    public SnappyFramedOutputStream(WritableByteChannel out)
            throws IOException
    {
        this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
    }

    /**
     * Creates a new {@link SnappyFramedOutputStream} using the
     * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
     * <p>
     * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
     * </p>
     *
     * @param out The underlying {@link WritableByteChannel} to write to. Must
     * not be {@code null}.
     * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. 
     * @throws IOException
     */
    public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool)
            throws IOException
    {
        this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
    }

    /**
     * Creates a new {@link SnappyFramedOutputStream} instance.
     *
     * @param out The underlying {@link WritableByteChannel} to write to. Must
     * not be {@code null}.
     * @param blockSize The block size (of raw data) to compress before writing frames
     * to <i>out</i>. Must be in (0, 65536].
     * @param minCompressionRatio Defines the minimum compression ratio (
     * {@code compressedLength / rawLength}) that must be achieved to
     * write the compressed data. This must be in (0, 1.0].
     * @throws IOException
     * @since 1.1.1
     */
    public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
            double minCompressionRatio)
            throws IOException
    {
        this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
    }

    /**
     * Creates a new {@link SnappyFramedOutputStream} instance.
     *
     * @param out The underlying {@link WritableByteChannel} to write to. Must
     * not be {@code null}.
     * @param blockSize The block size (of raw data) to compress before writing frames
     * to <i>out</i>. Must be in (0, 65536].
     * @param minCompressionRatio Defines the minimum compression ratio (
     * {@code compressedLength / rawLength}) that must be achieved to
     * write the compressed data. This must be in (0, 1.0].
     * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
     * @throws IOException
     * @since 1.1.1
     */
    public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
            double minCompressionRatio, BufferPool bufferPool)
            throws IOException
    {
        if (out == null) {
            throw new NullPointerException("out is null");
        }

        if (bufferPool == null) {
            throw new NullPointerException("buffer pool is null");
        }

        if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) {
            throw new IllegalArgumentException("minCompressionRatio "
                    + minCompressionRatio + " must be in (0,1.0]");
        }

        if (blockSize <= 0 || blockSize > MAX_BLOCK_SIZE) {
            throw new IllegalArgumentException("block size " + blockSize
                    + " must be in (0, 65536]");
        }
        this.blockSize = blockSize;
        this.out = out;
        this.minCompressionRatio = minCompressionRatio;

        this.bufferPool = bufferPool;
        buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize);
        directInputBuffer = bufferPool.allocateDirect(blockSize);
        outputBuffer = bufferPool.allocateDirect(Snappy
                .maxCompressedLength(blockSize));

        writeHeader(out);
    }

    /**
     * Writes the implementation specific header or "marker bytes" to
     * <i>out</i>.
     *
     * @param out The underlying {@link OutputStream}.
     * @throws IOException
     */
    private void writeHeader(WritableByteChannel out)
            throws IOException
    {
        out.write(ByteBuffer.wrap(HEADER_BYTES));
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isOpen()
    {
        return !closed;
    }

    @Override
    public void write(int b)
            throws IOException
    {
        if (closed) {
            throw new IOException("Stream is closed");
        }
        if (buffer.remaining() <= 0) {
            flushBuffer();
        }
        buffer.put((byte) b);
    }

    @Override
    public void write(byte[] input, int offset, int length)
            throws IOException
    {
        if (closed) {
            throw new IOException("Stream is closed");
        }

        if (input == null) {
            throw new NullPointerException();
        }
        else if ((offset < 0) || (offset > input.length) || (length < 0)
                || ((offset + length) > input.length)
                || ((offset + length) < 0)) {
            throw new IndexOutOfBoundsException();
        }

        while (length > 0) {
            if (buffer.remaining() <= 0) {
                flushBuffer();
            }

            final int toPut = Math.min(length, buffer.remaining());
            buffer.put(input, offset, toPut);
            offset += toPut;
            length -= toPut;
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public int write(ByteBuffer src)
            throws IOException
    {
        if (closed) {
            throw new ClosedChannelException();
        }

        if (buffer.remaining() <= 0) {
            flushBuffer();
        }

        final int srcLength = src.remaining();

        // easy case: enough free space in buffer for entire input
        if (buffer.remaining() >= src.remaining()) {
            buffer.put(src);
            return srcLength;
        }

        // store current limit
        final int srcEnd = src.position() + src.remaining();

        while ((src.position() + buffer.remaining()) <= srcEnd) {
            // fill partial buffer as much as possible and flush
            src.limit(src.position() + buffer.remaining());
            buffer.put(src);
            flushBuffer();
        }

        // reset original limit
        src.limit(srcEnd);

        // copy remaining partial block into now-empty buffer
        buffer.put(src);

        return srcLength;
    }

    /**
     * Transfers all the content from <i>is</i> to this {@link OutputStream}.
     * This potentially limits the amount of buffering required to compress
     * content.
     *
     * @param is The source of data to compress.
     * @return The number of bytes read from <i>is</i>.
     * @throws IOException
     * @since 1.1.1
     */
    public long transferFrom(InputStream is)
            throws IOException
    {
        if (closed) {
            throw new ClosedChannelException();
        }

        if (is == null) {
            throw new NullPointerException();
        }

        if (buffer.remaining() == 0) {
            flushBuffer();
        }

        assert buffer.hasArray();
        final byte[] bytes = buffer.array();

        final int arrayOffset = buffer.arrayOffset();
        long totTransfered = 0;
        int read;
        while ((read = is.read(bytes, arrayOffset + buffer.position(),
                buffer.remaining())) != -1) {
            buffer.position(buffer.position() + read);

            if (buffer.remaining() == 0) {
                flushBuffer();
            }

            totTransfered += read;
        }

        return totTransfered;
    }

    /**
     * Transfers all the content from <i>rbc</i> to this
     * {@link WritableByteChannel}. This potentially limits the amount of
     * buffering required to compress content.
     *
     * @param rbc The source of data to compress.
     * @return The number of bytes read from <i>rbc</i>.
     * @throws IOException
     * @since 1.1.1
     */
    public long transferFrom(ReadableByteChannel rbc)
            throws IOException
    {
        if (closed) {
            throw new ClosedChannelException();
        }

        if (rbc == null) {
            throw new NullPointerException();
        }

        if (buffer.remaining() == 0) {
            flushBuffer();
        }

        long totTransfered = 0;
        int read;
        while ((read = rbc.read(buffer)) != -1) {
            if (buffer.remaining() == 0) {
                flushBuffer();
            }

            totTransfered += read;
        }

        return totTransfered;
    }

    @Override
    public final void flush()
            throws IOException
    {
        if (closed) {
            throw new IOException("Stream is closed");
        }
        flushBuffer();
    }

    @Override
    public final void close()
            throws IOException
    {
        if (closed) {
            return;
        }
        try {
            flush();
            out.close();
        }
        finally {
            closed = true;
            bufferPool.releaseArray(buffer.array());
            bufferPool.releaseDirect(directInputBuffer);
            bufferPool.releaseDirect(outputBuffer);
        }
    }

    /**
     * Compresses and writes out any buffered data. This does nothing if there
     * is no currently buffered data.
     *
     * @throws IOException
     */
    private void flushBuffer()
            throws IOException
    {
        if (buffer.position() > 0) {
            buffer.flip();
            writeCompressed(buffer);
            buffer.clear();
            buffer.limit(blockSize);
        }
    }

    /**
     * {@link SnappyFramed#maskedCrc32c(byte[], int, int)} the crc, compresses
     * the data, determines if the compression ratio is acceptable and calls
     * {@link #writeBlock(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer, boolean, int)} to
     * actually write the frame.
     *
     * @param buffer
     * @throws IOException
     */
    private void writeCompressed(ByteBuffer buffer)
            throws IOException
    {

        final byte[] input = buffer.array();
        final int length = buffer.remaining();

        // crc is based on the user supplied input data
        final int crc32c = maskedCrc32c(crc32, input, 0, length);

        directInputBuffer.clear();
        directInputBuffer.put(buffer);
        directInputBuffer.flip();

        outputBuffer.clear();
        Snappy.compress(directInputBuffer, outputBuffer);

        final int compressedLength = outputBuffer.remaining();

        // only use the compressed data if compression ratio is <= the
        // minCompressonRatio
        if (((double) compressedLength / (double) length) <= minCompressionRatio) {
            writeBlock(out, outputBuffer, true, crc32c);
        }
        else {
            // otherwise use the uncompressed data.
            buffer.flip();
            writeBlock(out, buffer, false, crc32c);
        }
    }

    /**
     * Write a frame (block) to <i>out</i>.
     *
     * @param out The {@link OutputStream} to write to.
     * @param data The data to write.
     * @param compressed Indicates if <i>data</i> is the compressed or raw content.
     * This is based on whether the compression ratio desired is
     * reached.
     * @param crc32c The calculated checksum.
     * @throws IOException
     */
    private void writeBlock(final WritableByteChannel out, ByteBuffer data,
            boolean compressed, int crc32c)
            throws IOException
    {

        headerBuffer.clear();
        headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG
                : UNCOMPRESSED_DATA_FLAG));

        // the length written out to the header is both the checksum and the
        // frame
        final int headerLength = data.remaining() + 4;

        // write length
        headerBuffer.put((byte) headerLength);
        headerBuffer.put((byte) (headerLength >>> 8));
        headerBuffer.put((byte) (headerLength >>> 16));

        // write crc32c of user input data
        headerBuffer.putInt(crc32c);

        headerBuffer.flip();

        // write the header
        out.write(headerBuffer);
        // write the raw data
        out.write(data);
    }
}