InflatingZstdDataConsumer.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.async.methods;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import com.github.luben.zstd.ZstdDecompressCtx;

import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;

/**
 * {@code AsyncDataConsumer} that inflates Zstandard (zstd) content codings.
 *
 * <p>This consumer accepts compressed bytes and forwards decompressed data to a downstream
 * {@link org.apache.hc.core5.http.nio.AsyncDataConsumer}. It is intended to be installed by the
 * client execution chain when a response carries {@code Content-Encoding: zstd}. Applications
 * normally do not instantiate it directly���enable content compression (default) and let
 * {@code ContentCompressionAsyncExec} wire it automatically.</p>
 *
 * <h3>Behavior</h3>
 * <ul>
 *   <li>Streams decompression as data arrives; does not require the full message in memory.</li>
 *   <li>Updates the downstream with plain bytes; the client removes the original
 *       {@code Content-Encoding} and related headers.</li>
 *   <li>On malformed input it throws an {@link java.io.IOException} with a descriptive message.</li>
 *   <li>{@link #releaseResources()} must be called to free native resources.</li>
 * </ul>
 *
 * <h3>Typical wiring (automatic)</h3>
 * <pre>{@code
 * // Enabled by default; when the response has Content-Encoding: zstd
 * // the execution chain wraps the application consumer:
 * client = HttpAsyncClients.createDefault();
 * }</pre>
 *
 * <h3>Direct use (advanced)</h3>
 * <pre>{@code
 * AsyncDataConsumer app = ...; // where you want plain bytes
 * AsyncDataConsumer zstd = new InflatingZstdDataConsumer(app);
 * // feed zstd.consume(ByteBuffer) with compressed bytes
 * }</pre>
 *
 * @since 5.6
 */

public final class InflatingZstdDataConsumer implements AsyncDataConsumer {

    private static final int IN_BUF = 64 * 1024;
    private static final int OUT_BUF = 128 * 1024; // ~ZSTD_DStreamOutSize(); tweak if you like. :contentReference[oaicite:3]{index=3}

    private final AsyncDataConsumer downstream;
    private final ZstdDecompressCtx dctx = new ZstdDecompressCtx();

    private final ByteBuffer inDirect = ByteBuffer.allocateDirect(IN_BUF);
    private final ByteBuffer outDirect = ByteBuffer.allocateDirect(OUT_BUF);

    private final AtomicBoolean closed = new AtomicBoolean(false);

    public InflatingZstdDataConsumer(final AsyncDataConsumer downstream) {
        this.downstream = downstream;
        inDirect.limit(0);
        outDirect.limit(0);
    }

    @Override
    public void updateCapacity(final CapacityChannel c) throws IOException {
        downstream.updateCapacity(c);
    }

    @Override
    public void consume(final ByteBuffer src) throws IOException {
        if (closed.get()) {
            return;
        }

        // Copy any incoming bytes into the direct input buffer, draining as we go.
        while (src.hasRemaining()) {
            inDirect.compact();
            final int take = Math.min(inDirect.remaining(), src.remaining());
            final int oldLimit = src.limit();
            src.limit(src.position() + take);
            inDirect.put(src);
            src.limit(oldLimit);
            inDirect.flip();

            // Pull decompressed bytes until we either need more input or downstream back-pressures
            while (inDirect.hasRemaining()) {
                outDirect.compact();
                // Streaming decompress: fills outDirect from inDirect; returns when either
                // input exhausted or output buffer full.
                dctx.decompressDirectByteBufferStream(outDirect, inDirect);
                outDirect.flip();
                if (outDirect.hasRemaining()) {
                    downstream.consume(outDirect);
                    if (outDirect.hasRemaining()) {
                        // downstream applied back-pressure; stop here, we���ll resume on next callback
                        return;
                    }
                } else {
                    break; // need more input
                }
            }
        }
    }

    @Override
    public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
        if (closed.compareAndSet(false, true)) {
            dctx.close();
            downstream.streamEnd(trailers);
        }
    }

    @Override
    public void releaseResources() {
        dctx.close();
        downstream.releaseResources();
    }
}