BaseTestLineRecordReaderBZip2.java

/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */
package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter;
import org.apache.hadoop.io.compress.bzip2.BZip2Utils;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.junit.Assert.assertEquals;

public abstract class BaseTestLineRecordReaderBZip2 {

  // LF stands for line feed
  private static final byte[] LF = new byte[] {'\n'};
  // CR stands for cartridge return
  private static final byte[] CR = new byte[] {'\r'};
  private static final byte[] CR_LF = new byte[] {'\r', '\n'};

  private Configuration conf;
  private FileSystem fs;
  private Path tempFile;

  public Configuration getConf() {
    return conf;
  }

  public FileSystem getFs() {
    return fs;
  }

  public Path getTempFile() {
    return tempFile;
  }

  @Before
  public void setUp() throws Exception {
    conf = new Configuration();

    Path workDir = new Path(
        System.getProperty("test.build.data", "target"),
        "data/" + getClass().getSimpleName());

    fs = workDir.getFileSystem(conf);

    Path inputDir = new Path(workDir, "input");
    tempFile = new Path(inputDir, "test.txt.bz2");
  }

  @After
  public void tearDown() throws Exception {
    fs.delete(tempFile, /* recursive */ false);
  }

  @Test
  public void firstBlockEndsWithLF() throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
  }

  @Test
  public void firstBlockEndsWithLFSecondBlockStartsWithLF() throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
      // Write 254 empty rows terminating at LF, as those records will get
      // rolled into the first block record due to run-length encoding, the
      // 255th LF character will trigger a run to be written to the block. We
      // only need 254 LF characters since the last byte written by prior
      // writeManyRecords call is already a LF.
      writer.writeManyRecords(254, 254, LF);

      // This LF character should be the first byte of the second block, but
      // if splitting at blocks, the first split will read this record as the
      // additional record.
      writer.writeRecord(1, LF);

      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1255, 2});
  }

  @Test
  public void firstBlockEndsWithLFSecondBlockStartsWithCR() throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
      writer.writeRecord(1, CR);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
  }

  @Test
  public void firstBlockEndsWithCRLF() throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE, 1000, CR_LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
  }

  @Test
  public void lastRecordContentSpanAcrossBlocks()
      throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
      writer.writeRecord(100, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
  }

  @Test
  public void lastRecordOfBlockHasItsLFInNextBlock() throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
      // The LF character is the first byte of the second block
      writer.writeRecord(51, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
  }

  @Test
  public void lastRecordOfFirstBlockHasItsCRLFInSecondBlock() throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
      // Both CR + LF characters are the first two bytes of second block
      writer.writeRecord(52, CR_LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
  }

  @Test
  public void lastRecordOfFirstBlockHasItsCRLFPartlyInSecondBlock()
      throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
      // The CR character is the last byte of the first block and the LF is
      // the firs byte of the second block
      writer.writeRecord(51, CR_LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
  }

  @Test
  public void lastByteInFirstBlockIsCRFirstByteInSecondBlockIsNotLF()
      throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE, 1000, CR);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
      writer.writeRecord(10, LF);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
  }

  @Test
  public void usingCRDelimiterWithSmallestBufferSize() throws Exception {
    // Forces calling LineReader#fillBuffer for ever byte read
    conf.set(IO_FILE_BUFFER_SIZE_KEY, "1");

    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 50, 999, CR);
      writer.writeRecord(100, CR);
      writer.writeRecord(10, CR);
      writer.writeRecord(10, CR);
      writer.writeRecord(10, CR);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
  }

  @Test
  public void delimitedByCRSpanningThreeBlocks() throws Exception {
    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeRecord(3 * BLOCK_SIZE, CR);
      writer.writeRecord(3 * BLOCK_SIZE, CR);
      writer.writeRecord(3 * BLOCK_SIZE, CR);
    }
    assertRecordCountsPerSplit(tempFile,
        new long[] {1, 0, 1, 0, 0, 1, 0, 0, 0});
  }

  @Test
  public void customDelimiterLastThreeBytesInBlockAreDelimiter()
      throws Exception {
    byte[] delimiter = new byte[] {'e', 'n', 'd'};
    setDelimiter(delimiter);

    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE, 1000, delimiter);
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
  }

  @Test
  public void customDelimiterDelimiterSpansAcrossBlocks()
      throws Exception {
    byte[] delimiter = new byte[] {'e', 'n', 'd'};
    setDelimiter(delimiter);

    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
      writer.writeRecord(52, delimiter);
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
  }

  @Test
  public void customDelimiterLastRecordDelimiterStartsAtNextBlockStart()
      throws Exception {
    byte[] delimiter = new byte[] {'e', 'n', 'd'};
    setDelimiter(delimiter);

    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
      writer.writeRecord(53, delimiter);
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
  }

  @Test
  public void customDelimiterLastBlockBytesShareCommonPrefixWithDelimiter()
      throws Exception {
    byte[] delimiter = new byte[] {'e', 'n', 'd'};
    setDelimiter(delimiter);

    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
      writer.writeManyRecords(BLOCK_SIZE - 4, 999, delimiter);
      // The first 4 bytes, "an e", will be the last 4 bytes of the first block,
      // the last byte being 'e' which matches the first character of the
      // delimiter "end". The first byte of the next block also matches the
      // second byte of the delimiter "n"; however the next character "c" does
      // not match the last character of the delimiter. Thus an additional
      // record should not be read for the split that reads the first block.
      // The split that reads the second block will just discard
      // "nchanting tale coming to an end".
      writer.write("an enchanting tale coming to an end");
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
      writer.writeRecord(10, delimiter);
    }
    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
  }

  protected abstract BaseLineRecordReaderHelper newReader(Path file);

  private void assertRecordCountsPerSplit(
      Path path, long[] countsIfSplitAtBlocks) throws IOException {
    RecordCountAssert countAssert =
        new RecordCountAssert(path, countsIfSplitAtBlocks);
    countAssert.assertSingleSplit();
    countAssert.assertSplittingAtBlocks();
    countAssert.assertSplittingJustAfterSecondBlockStarts();
    countAssert.assertSplittingEachBlockRangeInThreeParts();
    countAssert.assertSplitsAroundBlockStartOffsets();
  }

  private class RecordCountAssert {

    private final BaseLineRecordReaderHelper reader;
    private final long numBlocks;
    private final long[] countsIfSplitAtBlocks;
    private final long fileSize;
    private final long totalRecords;
    private final List<Long> nextBlockOffsets;

    RecordCountAssert(
        Path path, long[] countsIfSplitAtBlocks) throws IOException {
      this.reader = newReader(path);
      this.countsIfSplitAtBlocks = countsIfSplitAtBlocks;
      this.fileSize = getFileSize(path);
      this.totalRecords = Arrays.stream(countsIfSplitAtBlocks).sum();
      this.numBlocks = countsIfSplitAtBlocks.length;
      this.nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(path, conf);

      assertEquals(numBlocks, nextBlockOffsets.size() + 1);
    }

    private void assertSingleSplit() throws IOException {
      assertEquals(totalRecords, reader.countRecords(0, fileSize));
    }

    private void assertSplittingAtBlocks() throws IOException {
      assertSplits(getSplitsAtBlocks());
    }

    private void assertSplittingJustAfterSecondBlockStarts()
        throws IOException {
      if (numBlocks <= 1) {
        return;
      }
      long recordsInFirstTwoBlocks =
          countsIfSplitAtBlocks[0] + countsIfSplitAtBlocks[1];
      long remainingRecords = totalRecords - recordsInFirstTwoBlocks;

      long firstSplitSize = nextBlockOffsets.get(0) + 1;
      assertEquals(
          recordsInFirstTwoBlocks,
          reader.countRecords(0, firstSplitSize));
      assertEquals(
          remainingRecords,
          reader.countRecords(firstSplitSize, fileSize - firstSplitSize));
    }

    private void assertSplittingEachBlockRangeInThreeParts()
        throws IOException {
      for (SplitRange splitRange : getSplitsAtBlocks()) {
        long[] expectedNumRecordsPerPart = new long[] {
            splitRange.expectedNumRecords, 0, 0
        };
        List<SplitRange> parts = splitRange.divide(expectedNumRecordsPerPart);
        assertSplits(parts);
      }
    }

    private void assertSplitsAroundBlockStartOffsets()
        throws IOException {
      for (SplitRange split : getSplitsAtBlocks()) {
        assertSplit(split.withLength(1));
        if (split.start > 0) {
          assertSplit(split.moveBy(-2).withLength(3));
          assertSplit(split.moveBy(-2).withLength(2).withExpectedNumRecords(0));
          assertSplit(split.moveBy(-1).withLength(2));
          assertSplit(split.moveBy(-1).withLength(1).withExpectedNumRecords(0));
        }
        assertSplit(split.moveBy(1).withLength(1).withExpectedNumRecords(0));
        assertSplit(split.moveBy(2).withLength(1).withExpectedNumRecords(0));
      }
    }

    private List<SplitRange> getSplitsAtBlocks() {
      List<SplitRange> splits = new ArrayList<>();
      for (int i = 0; i < numBlocks; i++) {
        String name = "Block" + i;
        long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1);
        long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i);
        long length = end - start;
        long expectedNumRecords = countsIfSplitAtBlocks[i];
        splits.add(new SplitRange(name, start, length, expectedNumRecords));
      }
      return splits;
    }

    private void assertSplits(Iterable<SplitRange> splitRanges)
        throws IOException {
      for (SplitRange splitRange : splitRanges) {
        assertSplit(splitRange);
      }
    }

    private void assertSplit(SplitRange splitRange) throws IOException {
      String message = splitRange.toString();
      long actual = reader.countRecords(splitRange.start, splitRange.length);
      assertEquals(message, splitRange.expectedNumRecords, actual);
    }
  }

  private static class SplitRange {
    final private String name;
    final private long start;
    final private long length;
    final private long expectedNumRecords;

    SplitRange(
        String name,
        long start,
        long length,
        long expectedNumRecords) {
      this.name = name;
      this.start = start;
      this.length = length;
      this.expectedNumRecords = expectedNumRecords;
    }

    @Override
    public String toString() {
      return new StringJoiner(", ", SplitRange.class.getSimpleName() + "[", "]")
          .add("name='" + name + "'")
          .add("start=" + start)
          .add("length=" + length)
          .add("expectedNumRecords=" + expectedNumRecords)
          .toString();
    }

    List<SplitRange> divide(long[] expectedNumRecordsPerPart) {
      int numParts = expectedNumRecordsPerPart.length;
      checkArgument(numParts > 0);

      long minPartSize = length / numParts;
      checkArgument(minPartSize > 0);
      long lastPartExtraSize = length % numParts;

      List<SplitRange> partRanges = new ArrayList<>();
      long partStart = start;
      for (int i = 0; i < numParts; i++) {
        String partName = name + "_Part" + i;

        long extraSize = i == numParts - 1 ? lastPartExtraSize : 0;
        long partSize = minPartSize + extraSize;

        long partExpectedNumRecords = expectedNumRecordsPerPart[i];

        partRanges.add(new SplitRange(
            partName, partStart, partSize, partExpectedNumRecords));
        partStart += partSize;
      }
      return partRanges;
    }

    SplitRange withLength(long newLength) {
      return new SplitRange(name, start, newLength, expectedNumRecords);
    }

    SplitRange withExpectedNumRecords(long newExpectedNumRecords) {
      return new SplitRange(name, start, length, newExpectedNumRecords);
    }

    SplitRange moveBy(long delta) {
      return new SplitRange(name, start + delta, length, expectedNumRecords);
    }
  }

  private long getFileSize(Path path) throws IOException {
    return fs.getFileStatus(path).getLen();
  }

  private void setDelimiter(byte[] delimiter) {
    conf.set("textinputformat.record.delimiter",
        new String(delimiter, StandardCharsets.UTF_8));
  }
}