DeflatingGzipEntityProducer.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.async.methods;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;
import java.util.zip.Deflater;

import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.util.Args;

/**
 * Streams an {@link AsyncEntityProducer} through raw DEFLATE
 * and wraps the result in a valid GZIP container.
 * <p>
 * Memory usage is bounded (8 KiB buffers) and back-pressure
 * from the I/O reactor is honoured.
 *
 * @since 5.6
 */
public final class DeflatingGzipEntityProducer implements AsyncEntityProducer {

    /* ---------------- constants & buffers --------------------------- */

    private static final int IN_BUF = 8 * 1024;
    private static final int OUT_BUF = 8 * 1024;

    private final AsyncEntityProducer delegate;
    private final CRC32 crc = new CRC32();
    private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
    private final byte[] in = new byte[IN_BUF];
    private final ByteBuffer outBuf = ByteBuffer.allocate(OUT_BUF);

    private boolean headerSent = false;
    private boolean finished = false;
    private long uncompressed = 0;

    private final AtomicBoolean released = new AtomicBoolean(false);

    public DeflatingGzipEntityProducer(final AsyncEntityProducer delegate) {
        this.delegate = Args.notNull(delegate, "delegate");
        outBuf.flip(); // start in ���read mode��� with no data
    }

    /* ------------------- metadata ------------------- */

    @Override
    public boolean isRepeatable() {
        return delegate.isRepeatable();
    }

    @Override
    public long getContentLength() {
        return -1;
    } // unknown

    @Override
    public String getContentType() {
        return delegate.getContentType();
    }

    @Override
    public String getContentEncoding() {
        return "gzip";
    }

    @Override
    public boolean isChunked() {
        return true;
    }

    @Override
    public Set<String> getTrailerNames() {
        return Collections.emptySet();
    }

    @Override
    public int available() {
        return outBuf.hasRemaining() ? outBuf.remaining() : delegate.available();
    }

    /* ------------------- core ----------------------- */

    @Override
    public void produce(final DataStreamChannel chan) throws IOException {

        flushOut(chan);                    // 1) flush any pending data

        if (finished) {
            return;                        // already done
        }

        delegate.produce(new InnerChannel(chan)); // 2) pull more input

        /* 3) when delegate is done ��� finish deflater, drain, trailer */
        if (delegate.available() == 0 && !finished) {

            deflater.finish();             // signal EOF to compressor
            while (!deflater.finished()) { // drain *everything*
                deflateToOut();
                flushOut(chan);
            }

            /* ---------------- little-endian trailer ---------------- */
            final int crcVal = (int) crc.getValue();
            final int size = (int) uncompressed;

            final byte[] trailer = {
                    (byte) crcVal, (byte) (crcVal >>> 8),
                    (byte) (crcVal >>> 16), (byte) (crcVal >>> 24),
                    (byte) size, (byte) (size >>> 8),
                    (byte) (size >>> 16), (byte) (size >>> 24)
            };
            chan.write(ByteBuffer.wrap(trailer));

            finished = true;
            chan.endStream();
        }
    }

    /* copy all currently available bytes from deflater into outBuf */
    private void deflateToOut() {
        outBuf.compact();                  // switch to ���write mode���
        byte[] arr = outBuf.array();
        int pos = outBuf.position();
        int lim = outBuf.limit();
        int n;
        while ((n = deflater.deflate(arr, pos, lim - pos, Deflater.NO_FLUSH)) > 0) {
            pos += n;
            if (pos == lim) {              // buffer full ��� grow 2��
                final ByteBuffer bigger = ByteBuffer.allocate(arr.length * 2);
                outBuf.flip();
                bigger.put(outBuf);
                outBuf.clear();
                outBuf.put(bigger);
                arr = outBuf.array();
                lim = outBuf.limit();
                pos = outBuf.position();
            }
        }
        outBuf.position(pos);
        outBuf.flip();                     // back to ���read mode���
    }

    /* send as much of outBuf as the channel will accept */
    private void flushOut(final DataStreamChannel chan) throws IOException {
        while (outBuf.hasRemaining()) {
            final int written = chan.write(outBuf);
            if (written == 0) {
                break; // back-pressure
            }
        }
    }

    /* --------------- inner channel feeding deflater ---------------- */

    private final class InnerChannel implements DataStreamChannel {
        private final DataStreamChannel chan;

        InnerChannel(final DataStreamChannel chan) {
            this.chan = chan;
        }

        @Override
        public void requestOutput() {
            chan.requestOutput();
        }

        @Override
        public int write(final ByteBuffer src) throws IOException {

            if (!headerSent) {             // write 10-byte GZIP header
                chan.write(ByteBuffer.wrap(new byte[]{
                        0x1f, (byte) 0x8b, 8, 0, 0, 0, 0, 0, 0, 0
                }));
                headerSent = true;
            }

            int consumed = 0;
            while (src.hasRemaining()) {
                final int chunk = Math.min(src.remaining(), in.length);
                src.get(in, 0, chunk);

                crc.update(in, 0, chunk);
                uncompressed += chunk;

                deflater.setInput(in, 0, chunk);
                consumed += chunk;

                deflateToOut();
                flushOut(chan);
            }
            return consumed;
        }

        @Override
        public void endStream() { /* delegate.available()==0 is our signal */ }

        @Override
        public void endStream(final List<? extends Header> t) {
            endStream();
        }
    }

    /* ---------------- failure / cleanup ---------------------------- */

    @Override
    public void failed(final Exception cause) {
        delegate.failed(cause);
    }

    @Override
    public void releaseResources() {
        if (released.compareAndSet(false, true)) {
            deflater.end();
            delegate.releaseResources();
        }
    }
}