CompressInputStream.java

// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2015-2025 MariaDB Corporation Ab
package org.mariadb.jdbc.client.socket.impl;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.mariadb.jdbc.client.util.MutableByte;

/**
 * Compression handler, permitting decompression of mysql packet if needed. When compression is set,
 * using a 7 byte header to identify is packet is compressed or not.
 */
public class CompressInputStream extends InputStream {
  private final InputStream in;
  private final MutableByte sequence;

  private final byte[] header = new byte[7];

  private int end;
  private int pos;
  private volatile byte[] buf;

  /**
   * Constructor. When this handler is used, driver expect packet with 7 byte compression header
   *
   * <p>Implementation doesn't use synchronized/semaphore because all used are already locked by
   * Statement/PreparedStatement Reentrant lock
   *
   * @param in socket input stream
   * @param compressionSequence compression sequence
   */
  public CompressInputStream(InputStream in, MutableByte compressionSequence) {
    this.in = in;
    this.sequence = compressionSequence;
  }

  /**
   * Reads up to <code>len</code> bytes of data from the input stream into an array of bytes. An
   * attempt is made to read as many as <code>len</code> bytes, but a smaller number may be read.
   * The number of bytes actually read is returned as an integer.
   *
   * <p>This method blocks until input data is available, end of file is detected, or an exception
   * is thrown.
   *
   * <p>If <code>len</code> is zero, then no bytes are read and <code>0</code> is returned;
   * otherwise, there is an attempt to read at least one byte. If no byte is available because the
   * stream is at end of file, the value <code>-1</code> is returned; otherwise, at least one byte
   * is read and stored into <code>b</code>.
   *
   * <p>The first byte read is stored into element <code>b[off]</code>, the next one into <code>
   * b[off+1]</code>, and so on. The number of bytes read is, at most, equal to <code>len</code>.
   * Let <i>k</i> be the number of bytes actually read; these bytes will be stored in elements
   * <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>, leaving elements
   * <code>b[off+</code><i>k</i><code>]</code> through <code>b[off+len-1]</code> unaffected.
   *
   * <p>In every case, elements <code>b[0]</code> through <code>b[off]</code> and elements <code>
   * b[off+len]</code> through <code>b[b.length-1]</code> are unaffected.
   *
   * <p>The <code>read(b,</code> <code>off,</code> <code>len)</code> method for class <code>
   * InputStream</code> simply calls the method <code>read()</code> repeatedly. If the first such
   * call results in an <code>IOException</code>, that exception is returned from the call to the
   * <code>read(b,</code> <code>off,</code> <code>len)</code> method. If any subsequent call to
   * <code>read()</code> results in a <code>IOException</code>, the exception is caught and treated
   * as if it were end of file; the bytes read up to that point are stored into <code>b</code> and
   * the number of bytes read before the exception occurred is returned. The default implementation
   * of this method blocks until the requested amount of input data <code>len</code> has been read,
   * end of file is detected, or an exception is thrown. Subclasses are encouraged to provide a more
   * efficient implementation of this method.
   *
   * @param b the buffer into which the data is read.
   * @param off the start offset in array <code>b</code> at which the data is written.
   * @param len the maximum number of bytes to read.
   * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more
   *     data because the end of the stream has been reached.
   * @throws IOException If the first byte cannot be read for any reason other than end of file, or
   *     if the input stream has been closed, or if some other I/O error occurs.
   * @throws NullPointerException If <code>b</code> is <code>null</code>.
   * @throws IndexOutOfBoundsException If <code>off</code> is negative, <code>len</code> is
   *     negative, or <code>len</code> is greater than <code>b.length - off</code>
   * @see InputStream#read()
   */
  @Override
  public int read(byte[] b, int off, int len) throws IOException {
    if (len == 0) {
      return 0;
    }

    int totalReads = 0;
    do {
      if (end - pos <= 0) {
        retrieveBuffer();
      }
      // copy internal value to buf.
      int copyLength = Math.min(len - totalReads, end - pos);
      System.arraycopy(buf, pos, b, off + totalReads, copyLength);
      pos += copyLength;
      totalReads += copyLength;
    } while (totalReads < len && super.available() > 0);

    return totalReads;
  }

  private void retrieveBuffer() throws IOException {
    // ***************************************************
    // Read header
    // ***************************************************
    int remaining = 7;
    int readOffset = 0;
    do {
      int count = in.read(header, readOffset, remaining);
      if (count < 0) {
        throw new EOFException(
            "unexpected end of stream, read "
                + readOffset
                + " bytes from 7 (socket was closed by server)");
      }
      remaining -= count;
      readOffset += count;
    } while (remaining > 0);

    int compressedPacketLength =
        (header[0] & 0xff) + ((header[1] & 0xff) << 8) + ((header[2] & 0xff) << 16);
    sequence.set(header[3]);
    int packetLength = (header[4] & 0xff) + ((header[5] & 0xff) << 8) + ((header[6] & 0xff) << 16);
    boolean compressed = (packetLength != 0);
    remaining = compressedPacketLength;
    byte[] intermediaryBuf = new byte[remaining];

    // ***************************************************
    // Read content
    // ***************************************************

    readOffset = 0;
    do {
      int count = in.read(intermediaryBuf, readOffset, remaining);
      if (count < 0) {
        throw new EOFException(
            "unexpected end of stream, read "
                + ((compressed ? compressedPacketLength : packetLength) - remaining)
                + " bytes from "
                + (compressed ? compressedPacketLength : packetLength)
                + " (socket was closed by server)");
      }
      remaining -= count;
      readOffset += count;
    } while (remaining > 0);

    if (compressed) {
      buf = new byte[packetLength];
      Inflater inflater = new Inflater();
      inflater.setInput(intermediaryBuf);
      try {
        int actualUncompressBytes = inflater.inflate(buf);
        if (actualUncompressBytes != packetLength) {
          throw new IOException(
              "Invalid exception length after decompression "
                  + actualUncompressBytes
                  + ",expected "
                  + packetLength);
        }
      } catch (DataFormatException dfe) {
        throw new IOException(dfe);
      }
      inflater.end();
      end = packetLength;
    } else {
      buf = intermediaryBuf;
      end = compressedPacketLength;
    }
    pos = 0;
  }

  /**
   * Skips over and discards <code>n</code> bytes of data from this input stream. The <code>skip
   * </code> method may, for a variety of reasons, end up skipping over some smaller number of
   * bytes, possibly <code>0</code>. This may result from any of a number of conditions; reaching
   * end of file before <code>n</code> bytes have been skipped is only one possibility. The actual
   * number of bytes skipped is returned. If {@code n} is negative, the {@code skip} method for
   * class {@code InputStream} always returns 0, and no bytes are skipped. Subclasses may handle the
   * negative value differently.
   *
   * <p>The <code>skip</code> method of this class creates a byte array and then repeatedly reads
   * into it until <code>n</code> bytes have been read or the end of the stream has been reached.
   * Subclasses are encouraged to provide a more efficient implementation of this method. For
   * instance, the implementation may depend on the ability to seek.
   *
   * @param n the number of bytes to be skipped.
   * @return the actual number of bytes skipped.
   * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
   */
  @Override
  public long skip(long n) throws IOException {
    return read(new byte[(int) n], 0, (int) n);
  }

  /**
   * Returns an estimate of the number of bytes that can be read (or skipped over) from this input
   * stream without blocking by the next invocation of a method for this input stream. The next
   * invocation might be the same thread or another thread. A single read or skip of this many bytes
   * will not block, but may read or skip fewer bytes.
   *
   * <p>Note that while some implementations of {@code InputStream} will return the total number of
   * bytes in the stream, many will not. It is never correct to use the return value of this method
   * to allocate a buffer intended to hold all data in this stream.
   *
   * <p>A subclass' implementation of this method may choose to throw an {@link IOException} if this
   * input stream has been closed by invoking the {@link #close()} method.
   *
   * <p>The {@code available} method for class {@code InputStream} always returns {@code 0}.
   *
   * <p>This method should be overridden by subclasses.
   *
   * @return an estimate of the number of bytes that can be read (or skipped over) from this input
   *     stream without blocking or {@code 0} when it reaches the end of the input stream.
   * @throws IOException if an I/O error occurs.
   */
  @Override
  public int available() throws IOException {
    return in.available();
  }

  /**
   * Closes this input stream and releases any system resources associated with the stream.
   *
   * <p>The <code>close</code> method of <code>InputStream</code> does nothing.
   *
   * @throws IOException if an I/O error occurs.
   */
  @Override
  public void close() throws IOException {
    in.close();
  }

  /**
   * Marks the current position in this input stream. A subsequent call to the <code>reset</code>
   * method repositions this stream at the last marked position so that subsequent reads re-read the
   * same bytes.
   *
   * <p>The <code>readlimit</code> arguments tells this input stream to allow that many bytes to be
   * read before the mark position gets invalidated.
   *
   * <p>The general contract of <code>mark</code> is that, if the method <code>markSupported</code>
   * returns <code>true</code>, the stream somehow remembers all the bytes read after the call to
   * <code>mark</code> and stands ready to supply those same bytes again if and whenever the method
   * <code>reset</code> is called. However, the stream is not required to remember any data at all
   * if more than <code>readlimit</code> bytes are read from the stream before <code>reset</code> is
   * called.
   *
   * <p>Marking a closed stream should not have any effect on the stream.
   *
   * <p>The <code>mark</code> method of <code>InputStream</code> does nothing.
   *
   * @param readlimit the maximum limit of bytes that can be read before the mark position becomes
   *     invalid.
   * @see InputStream#reset()
   */
  @Override
  public void mark(int readlimit) {
    in.mark(readlimit);
  }

  /**
   * Repositions this stream to the position at the time the <code>mark</code> method was last
   * called on this input stream.
   *
   * <p>The general contract of <code>reset</code> is:
   *
   * <ul>
   *   <li>If the method <code>markSupported</code> returns <code>true</code>, then:
   *       <ul>
   *         <li>If the method <code>mark</code> has not been called since the stream was created,
   *             or the number of bytes read from the stream since <code>mark</code> was last called
   *             is larger than the argument to <code>mark</code> at that last call, then an <code>
   *             IOException</code> might be thrown.
   *         <li>If such an <code>IOException</code> is not thrown, then the stream is reset to a
   *             state such that all the bytes read since the most recent call to <code>mark</code>
   *             (or since the start of the file, if <code>mark</code> has not been called) will be
   *             resupplied to subsequent callers of the <code>read</code> method, followed by any
   *             bytes that otherwise would have been the next input data as of the time of the call
   *             to <code>reset</code>.
   *       </ul>
   *   <li>If the method <code>markSupported</code> returns <code>false</code>, then:
   *       <ul>
   *         <li>The call to <code>reset</code> may throw an <code>IOException</code>.
   *         <li>If an <code>IOException</code> is not thrown, then the stream is reset to a fixed
   *             state that depends on the particular type of the input stream and how it was
   *             created. The bytes that will be supplied to subsequent callers of the <code>read
   *             </code> method depend on the particular type of the input stream.
   *       </ul>
   * </ul>
   *
   * <p>The method <code>reset</code> for class <code>InputStream</code> does nothing except throw
   * an <code>IOException</code>.
   *
   * @throws IOException if this stream has not been marked or if the mark has been invalidated.
   * @see InputStream#mark(int)
   * @see IOException
   */
  @Override
  public void reset() throws IOException {
    in.reset();
  }

  /**
   * Tests if this input stream supports the <code>mark</code> and <code>reset</code> methods.
   * Whether <code>mark</code> and <code>reset</code> are supported is an invariant property of a
   * particular input stream instance. The <code>markSupported</code> method of <code>
   * InputStream</code> returns <code>false</code>.
   *
   * @return <code>true</code> if this stream instance supports the mark and reset methods; <code>
   *     false</code> otherwise.
   * @see InputStream#mark(int)
   * @see InputStream#reset()
   */
  @Override
  public boolean markSupported() {
    return in.markSupported();
  }

  /**
   * Reads the next byte of data from the input stream. The value byte is returned as an <code>int
   * </code> in the range <code>0</code> to <code>255</code>. If no byte is available because the
   * end of the stream has been reached, the value <code>-1</code> is returned. This method blocks
   * until input data is available, the end of the stream is detected, or an exception is thrown.
   *
   * <p>A subclass must provide an implementation of this method.
   *
   * @return the next byte of data, or <code>-1</code> if the end of the stream is reached.
   * @throws IOException if an I/O error occurs.
   */
  @Override
  public int read() throws IOException {
    throw new IOException("NOT IMPLEMENTED !");
  }
}