ITUseHadoopCodecs.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.hadoop.example;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import java.io.*;
import java.util.Arrays;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RandomDatum;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Ensure that we can perform codec operations given the API and runtime jars
 * by performing some simple smoke tests.
 */
public class ITUseHadoopCodecs {

  private static final Logger LOG = LoggerFactory.getLogger(ITUseHadoopCodecs.class);

  private Configuration haddopConf = new Configuration();
  private int dataCount = 100;
  private int dataSeed = new Random().nextInt();

  @Test
  public void testGzipCodec() throws IOException {
    ZlibFactory.setNativeZlibLoaded(false);
    assertFalse(ZlibFactory.isNativeZlibLoaded(haddopConf));
    codecTest(haddopConf, dataSeed, 0, "org.apache.hadoop.io.compress.GzipCodec");
    codecTest(haddopConf, dataSeed, dataCount, "org.apache.hadoop.io.compress.GzipCodec");
  }

  @Test
  public void testSnappyCodec() throws IOException {
    codecTest(haddopConf, dataSeed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
    codecTest(haddopConf, dataSeed, dataCount, "org.apache.hadoop.io.compress.SnappyCodec");
  }

  @Test
  public void testLz4Codec() {
    Arrays.asList(false, true).forEach(config -> {
      haddopConf.setBoolean(
          CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
          config);
      try {
        codecTest(haddopConf, dataSeed, 0, "org.apache.hadoop.io.compress.Lz4Codec");
        codecTest(haddopConf, dataSeed, dataCount, "org.apache.hadoop.io.compress.Lz4Codec");
      } catch (IOException e) {
        throw new RuntimeException("failed when running codecTest", e);
      }
    });
  }

  private void codecTest(Configuration conf, int seed, int count, String codecClass)
      throws IOException {

    // Create the codec
    CompressionCodec codec = null;
    try {
      codec = (CompressionCodec)
              ReflectionUtils.newInstance(conf.getClassByName(codecClass), conf);
    } catch (ClassNotFoundException cnfe) {
      throw new IOException("Illegal codec!");
    }
    LOG.info("Created a Codec object of type: " + codecClass);

    // Generate data
    DataOutputBuffer data = new DataOutputBuffer();
    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
    for(int i = 0; i < count; ++i) {
      generator.next();
      RandomDatum key = generator.getKey();
      RandomDatum value = generator.getValue();

      key.write(data);
      value.write(data);
    }
    LOG.info("Generated " + count + " records");

    // Compress data
    DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
    try (CompressionOutputStream deflateFilter =
        codec.createOutputStream(compressedDataBuffer);
        DataOutputStream deflateOut =
            new DataOutputStream(new BufferedOutputStream(deflateFilter))) {
      deflateOut.write(data.getData(), 0, data.getLength());
      deflateOut.flush();
      deflateFilter.finish();
    }

    // De-compress data
    DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
    deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
            compressedDataBuffer.getLength());
    DataInputBuffer originalData = new DataInputBuffer();
    originalData.reset(data.getData(), 0, data.getLength());
    try (CompressionInputStream inflateFilter =
        codec.createInputStream(deCompressedDataBuffer);
        DataInputStream originalIn =
            new DataInputStream(new BufferedInputStream(originalData))) {

      // Check
      int expected;
      do {
        expected = originalIn.read();
        assertEquals("Inflated stream read by byte does not match",
                expected, inflateFilter.read());
      } while (expected != -1);
    }

    LOG.info("SUCCESS! Completed checking " + count + " records");
  }
}