TestTextBlockAliasMap.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.*;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.junit.Test;

import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID;
import static org.junit.Assert.*;

/**
 * Test for the text based block format for provided block maps.
 */
public class TestTextBlockAliasMap {

  static final String OUTFILE_PATH = "hdfs://dummyServer:0000/";
  static final String OUTFILE_BASENAME = "dummyFile";
  static final Path OUTFILE = new Path(OUTFILE_PATH, OUTFILE_BASENAME + "txt");
  static final String BPID = "BPID-0";

  void check(TextWriter.Options opts, final Path vp,
      final Class<? extends CompressionCodec> vc) throws IOException {
    TextFileRegionAliasMap mFmt = new TextFileRegionAliasMap() {
      @Override
      public TextWriter createWriter(Path file, CompressionCodec codec,
          String delim, Configuration conf) throws IOException {
        assertEquals(vp, file);
        if (null == vc) {
          assertNull(codec);
        } else {
          assertEquals(vc, codec.getClass());
        }
        return null; // ignored
      }
    };
    mFmt.getWriter(opts, BPID);
  }

  void check(TextReader.Options opts, final Path vp,
      final Class<? extends CompressionCodec> vc) throws IOException {
    TextFileRegionAliasMap aliasMap = new TextFileRegionAliasMap() {
      @Override
      public TextReader createReader(Path file, String delim, Configuration cfg,
          String blockPoolID) throws IOException {
        assertEquals(vp, file);
        if (null != vc) {
          CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
          CompressionCodec codec = factory.getCodec(file);
          assertEquals(vc, codec.getClass());
        }
        return null; // ignored
      }
    };
    aliasMap.getReader(opts, BPID);
  }

  @Test
  public void testWriterOptions() throws Exception {
    TextWriter.Options opts = TextWriter.defaults();
    assertTrue(opts instanceof WriterOptions);
    WriterOptions wopts = (WriterOptions) opts;
    Path def =
        new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR_DEFAULT);
    assertEquals(def, wopts.getDir());
    assertNull(wopts.getCodec());

    Path cp = new Path(OUTFILE_PATH, "blocks_" + BPID + ".csv");
    opts.dirName(new Path(OUTFILE_PATH));
    check(opts, cp, null);

    opts.codec("gzip");
    cp = new Path(OUTFILE_PATH, "blocks_" + BPID + ".csv.gz");
    check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
  }

  @Test
  public void testReaderOptions() throws Exception {
    TextReader.Options opts = TextReader.defaults();
    assertTrue(opts instanceof ReaderOptions);
    ReaderOptions ropts = (ReaderOptions) opts;

    Path cp = new Path(OUTFILE_PATH, fileNameFromBlockPoolID(BPID));
    opts.filename(cp);
    check(opts, cp, null);

    cp = new Path(OUTFILE_PATH, "blocks_" + BPID + ".csv.gz");
    opts.filename(cp);
    check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
  }

  @Test
  public void testCSVReadWrite() throws Exception {
    final DataOutputBuffer out = new DataOutputBuffer();
    FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
    FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
    FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
    try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) {
      csv.store(r1);
      csv.store(r2);
      csv.store(r3);
    }
    Iterator<FileRegion> i3;
    try (TextReader csv = new TextReader(null, null, null, ",") {
      @Override
      public InputStream createStream() {
        DataInputBuffer in = new DataInputBuffer();
        in.reset(out.getData(), 0, out.getLength());
        return in;
        }}) {
      Iterator<FileRegion> i1 = csv.iterator();
      assertEquals(r1, i1.next());
      Iterator<FileRegion> i2 = csv.iterator();
      assertEquals(r1, i2.next());
      assertEquals(r2, i2.next());
      assertEquals(r3, i2.next());
      assertEquals(r2, i1.next());
      assertEquals(r3, i1.next());

      assertFalse(i1.hasNext());
      assertFalse(i2.hasNext());
      i3 = csv.iterator();
    }
    try {
      i3.next();
    } catch (IllegalStateException e) {
      return;
    }
    fail("Invalid iterator");
  }

  @Test
  public void testCSVReadWriteTsv() throws Exception {
    final DataOutputBuffer out = new DataOutputBuffer();
    FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
    FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
    FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
    try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) {
      csv.store(r1);
      csv.store(r2);
      csv.store(r3);
    }
    Iterator<FileRegion> i3;
    try (TextReader csv = new TextReader(null, null, null, "\t") {
      @Override
      public InputStream createStream() {
        DataInputBuffer in = new DataInputBuffer();
        in.reset(out.getData(), 0, out.getLength());
        return in;
      }}) {
      Iterator<FileRegion> i1 = csv.iterator();
      assertEquals(r1, i1.next());
      Iterator<FileRegion> i2 = csv.iterator();
      assertEquals(r1, i2.next());
      assertEquals(r2, i2.next());
      assertEquals(r3, i2.next());
      assertEquals(r2, i1.next());
      assertEquals(r3, i1.next());

      assertFalse(i1.hasNext());
      assertFalse(i2.hasNext());
      i3 = csv.iterator();
    }
    try {
      i3.next();
    } catch (IllegalStateException e) {
      return;
    }
    fail("Invalid iterator");
  }

}