TestLz4CompressorDecompressor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.io.compress.lz4;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.jupiter.api.Test;

public class TestLz4CompressorDecompressor {
  
  private static final Random rnd = new Random(12345l);

  //test on NullPointerException in {@code compressor.setInput()}
  @Test
  public void testCompressorSetInputNullPointerException() {
    try {
      Lz4Compressor compressor = new Lz4Compressor();
      compressor.setInput(null, 0, 10);
      fail("testCompressorSetInputNullPointerException error !!!");
    } catch (NullPointerException ex) {
      // expected
    } catch (Exception e) {
      fail("testCompressorSetInputNullPointerException ex error !!!");
    }
  }

  //test on NullPointerException in {@code decompressor.setInput()}
  @Test
  public void testDecompressorSetInputNullPointerException() {
    try {
      Lz4Decompressor decompressor = new Lz4Decompressor();
      decompressor.setInput(null, 0, 10);
      fail("testDecompressorSetInputNullPointerException error !!!");
    } catch (NullPointerException ex) {
      // expected
    } catch (Exception e) {
      fail("testDecompressorSetInputNullPointerException ex error !!!");
    }
  }
  
  //test on ArrayIndexOutOfBoundsException in {@code compressor.setInput()}
  @Test
  public void testCompressorSetInputAIOBException() {
    try {
      Lz4Compressor compressor = new Lz4Compressor();
      compressor.setInput(new byte[] {}, -5, 10);
      fail("testCompressorSetInputAIOBException error !!!");
    } catch (ArrayIndexOutOfBoundsException ex) {
      // expected
    } catch (Exception ex) {
      fail("testCompressorSetInputAIOBException ex error !!!");
    }
  }

  //test on ArrayIndexOutOfBoundsException in {@code decompressor.setInput()}
  @Test
  public void testDecompressorSetInputAIOUBException() {
    try {
      Lz4Decompressor decompressor = new Lz4Decompressor();
      decompressor.setInput(new byte[] {}, -5, 10);
      fail("testDecompressorSetInputAIOBException error !!!");
    } catch (ArrayIndexOutOfBoundsException ex) {
      // expected
    } catch (Exception e) {
      fail("testDecompressorSetInputAIOBException ex error !!!");
    }
  }

  //test on NullPointerException in {@code compressor.compress()}  
  @Test
  public void testCompressorCompressNullPointerException() {
    try {
      Lz4Compressor compressor = new Lz4Compressor();
      byte[] bytes = generate(1024 * 6);
      compressor.setInput(bytes, 0, bytes.length);
      compressor.compress(null, 0, 0);
      fail("testCompressorCompressNullPointerException error !!!");
    } catch (NullPointerException ex) {
      // expected
    } catch (Exception e) {
      fail("testCompressorCompressNullPointerException ex error !!!");
    }
  }

  //test on NullPointerException in {@code decompressor.decompress()}  
  @Test
  public void testDecompressorCompressNullPointerException() {
    try {
      Lz4Decompressor decompressor = new Lz4Decompressor();
      byte[] bytes = generate(1024 * 6);
      decompressor.setInput(bytes, 0, bytes.length);
      decompressor.decompress(null, 0, 0);
      fail("testDecompressorCompressNullPointerException error !!!");
    } catch (NullPointerException ex) {
      // expected
    } catch (Exception e) {
      fail("testDecompressorCompressNullPointerException ex error !!!");
    }
  }

  //test on ArrayIndexOutOfBoundsException in {@code compressor.compress()}  
  @Test
  public void testCompressorCompressAIOBException() {
    try {
      Lz4Compressor compressor = new Lz4Compressor();
      byte[] bytes = generate(1024 * 6);
      compressor.setInput(bytes, 0, bytes.length);
      compressor.compress(new byte[] {}, 0, -1);
      fail("testCompressorCompressAIOBException error !!!");
    } catch (ArrayIndexOutOfBoundsException ex) {
      // expected
    } catch (Exception e) {
      fail("testCompressorCompressAIOBException ex error !!!");
    }
  }

  //test on ArrayIndexOutOfBoundsException in decompressor.decompress()  
  @Test
  public void testDecompressorCompressAIOBException() {
    try {
      Lz4Decompressor decompressor = new Lz4Decompressor();
      byte[] bytes = generate(1024 * 6);
      decompressor.setInput(bytes, 0, bytes.length);
      decompressor.decompress(new byte[] {}, 0, -1);
      fail("testDecompressorCompressAIOBException error !!!");
    } catch (ArrayIndexOutOfBoundsException ex) {
      // expected
    } catch (Exception e) {
      fail("testDecompressorCompressAIOBException ex error !!!");
    }
  }
  
  // test Lz4Compressor compressor.compress()  
  @Test
  public void testSetInputWithBytesSizeMoreThenDefaultLz4CompressorByfferSize() {
    int BYTES_SIZE = 1024 * 64 + 1;
    try {
      Lz4Compressor compressor = new Lz4Compressor();
      byte[] bytes = generate(BYTES_SIZE);
      assertTrue(compressor.needsInput(), "needsInput error !!!");
      compressor.setInput(bytes, 0, bytes.length);
      byte[] emptyBytes = new byte[BYTES_SIZE];
      int csize = compressor.compress(emptyBytes, 0, bytes.length);
      assertTrue(csize != 0,
          "testSetInputWithBytesSizeMoreThenDefaultLz4CompressorByfferSize error !!!");
    } catch (Exception ex) {
      fail("testSetInputWithBytesSizeMoreThenDefaultLz4CompressorByfferSize ex error !!!");
    }
  }

  // test compress/decompress process 
  @Test
  public void testCompressDecompress() {
    int BYTE_SIZE = 1024 * 54;
    byte[] bytes = generate(BYTE_SIZE);
    Lz4Compressor compressor = new Lz4Compressor();
    try {
      compressor.setInput(bytes, 0, bytes.length);
      assertTrue(compressor.getBytesRead() > 0,
          "Lz4CompressDecompress getBytesRead error !!!");
      assertTrue(compressor.getBytesWritten() == 0,
          "Lz4CompressDecompress getBytesWritten before compress error !!!");

      byte[] compressed = new byte[BYTE_SIZE];
      int cSize = compressor.compress(compressed, 0, compressed.length);
      assertTrue(compressor.getBytesWritten() > 0,
          "Lz4CompressDecompress getBytesWritten after compress error !!!");
      Lz4Decompressor decompressor = new Lz4Decompressor();
      // set as input for decompressor only compressed data indicated with cSize
      decompressor.setInput(compressed, 0, cSize);
      byte[] decompressed = new byte[BYTE_SIZE];
      decompressor.decompress(decompressed, 0, decompressed.length);

      assertTrue(decompressor.finished(), "testLz4CompressDecompress finished error !!!");
      assertArrayEquals(bytes, decompressed);
      compressor.reset();
      decompressor.reset();
      assertTrue(decompressor.getRemaining() == 0,
          "decompressor getRemaining error !!!");
    } catch (Exception e) {
      fail("testLz4CompressDecompress ex error!!!");
    }
  }

  // test compress/decompress with empty stream
  @Test
  public void testCompressorDecompressorEmptyStreamLogic() {
    ByteArrayInputStream bytesIn = null;
    ByteArrayOutputStream bytesOut = null;
    byte[] buf = null;
    BlockDecompressorStream blockDecompressorStream = null;
    try {
      // compress empty stream
      bytesOut = new ByteArrayOutputStream();
      BlockCompressorStream blockCompressorStream = new BlockCompressorStream(
          bytesOut, new Lz4Compressor(), 1024, 0);
      // close without write
      blockCompressorStream.close();
      // check compressed output
      buf = bytesOut.toByteArray();
      assertEquals(4, buf.length, "empty stream compressed output size != 4");
      // use compressed output as input for decompression
      bytesIn = new ByteArrayInputStream(buf);
      // create decompression stream
      blockDecompressorStream = new BlockDecompressorStream(bytesIn,
          new Lz4Decompressor(), 1024);
      // no byte is available because stream was closed
      assertEquals(-1, blockDecompressorStream.read(), "return value is not -1");
    } catch (Exception e) {
      fail("testCompressorDecompressorEmptyStreamLogic ex error !!!"
          + e.getMessage());
    } finally {
      if (blockDecompressorStream != null)
        try {
          bytesIn.close();
          bytesOut.close();
          blockDecompressorStream.close();
        } catch (IOException e) {
        }
    }
  }
  
  // test compress/decompress process through CompressionOutputStream/CompressionInputStream api 
  @Test
  public void testCompressorDecopressorLogicWithCompressionStreams() {
    DataOutputStream deflateOut = null;
    DataInputStream inflateIn = null;
    int BYTE_SIZE = 1024 * 100;
    byte[] bytes = generate(BYTE_SIZE);
    int bufferSize = 262144;
    int compressionOverhead = (bufferSize / 6) + 32;
    try {
      DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
      CompressionOutputStream deflateFilter = new BlockCompressorStream(
          compressedDataBuffer, new Lz4Compressor(bufferSize), bufferSize,
          compressionOverhead);
      deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
      deflateOut.write(bytes, 0, bytes.length);
      deflateOut.flush();
      deflateFilter.finish();

      DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
      deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
          compressedDataBuffer.getLength());

      CompressionInputStream inflateFilter = new BlockDecompressorStream(
          deCompressedDataBuffer, new Lz4Decompressor(bufferSize), bufferSize);

      inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));

      byte[] result = new byte[BYTE_SIZE];
      inflateIn.read(result);

      assertArrayEquals(result,
          bytes, "original array not equals compress/decompressed array");
    } catch (IOException e) {
      fail("testLz4CompressorDecopressorLogicWithCompressionStreams ex error !!!");
    } finally {
      try {
        if (deflateOut != null)
          deflateOut.close();
        if (inflateIn != null)
          inflateIn.close();
      } catch (Exception e) {
      }
    }
  }  

  public static byte[] generate(int size) {
    byte[] array = new byte[size];
    for (int i = 0; i < size; i++)
      array[i] = (byte)rnd.nextInt(16);
    return array;
  }

  @Test
  public void testLz4CompressDecompressInMultiThreads() throws Exception {
    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
    for(int i=0;i<10;i++) {
      ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) {
        @Override
        public void doWork() throws Exception {
          testCompressDecompress();
        }
      });
    }
    ctx.startThreads();

    ctx.waitFor(60000);
  }

  @Test
  public void testLz4Compatibility() throws Exception {
    // The sequence file was created using native Lz4 codec before HADOOP-17292.
    // After we use lz4-java for lz4 compression, this test makes sure we can
    // decompress the sequence file correctly.
    Path filePath = new Path(TestLz4CompressorDecompressor.class
        .getResource("/lz4/sequencefile").toURI());

    Configuration conf = new Configuration();
    conf.setInt("io.seqfile.compress.blocksize", 1000);
    FileSystem fs = FileSystem.get(conf);

    int lines = 2000;

    SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);

    Writable key = (Writable)reader.getKeyClass().newInstance();
    Writable value = (Writable)reader.getValueClass().newInstance();

    int lc = 0;
    try {
      while (reader.next(key, value)) {
        assertEquals("key" + lc, key.toString());
        assertEquals("value" + lc, value.toString());
        lc++;
      }
    } finally {
      reader.close();
    }
    assertEquals(lines, lc);
  }
}