TestCombineFileInputFormat.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import org.apache.hadoop.thirdparty.com.google.common.collect.HashMultiset;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;

public class TestCombineFileInputFormat {

  private static final String rack1[] = new String[] {
    "/r1"
  };
  private static final String hosts1[] = new String[] {
    "host1.rack1.com"
  };
  private static final String rack2[] = new String[] {
    "/r2"
  };
  private static final String hosts2[] = new String[] {
    "host2.rack2.com"
  };
  private static final String rack3[] = new String[] {
    "/r3"
  };
  private static final String hosts3[] = new String[] {
    "host3.rack3.com"
  };
  final Path inDir = new Path("/racktesting");
  final Path outputPath = new Path("/output");
  final Path dir1 = new Path(inDir, "/dir1");
  final Path dir2 = new Path(inDir, "/dir2");
  final Path dir3 = new Path(inDir, "/dir3");
  final Path dir4 = new Path(inDir, "/dir4");
  final Path dir5 = new Path(inDir, "/dir5");

  static final int BLOCKSIZE = 1024;
  static final byte[] databuf = new byte[BLOCKSIZE];

  @Mock
  private List<String> mockList;

  @Before
  public void initMocks() {
    MockitoAnnotations.initMocks(this);
  }

  private static final String DUMMY_FS_URI = "dummyfs:///";

  /** Dummy class to extend CombineFileInputFormat*/
  private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
    @Override
    public RecordReader<Text,Text> createRecordReader(InputSplit split, 
        TaskAttemptContext context) throws IOException {
      return null;
    }
  }

  /** Dummy class to extend CombineFileInputFormat. It allows 
   * non-existent files to be passed into the CombineFileInputFormat, allows
   * for easy testing without having to create real files.
   */
  private class DummyInputFormat1 extends DummyInputFormat {
    @Override
    protected List<FileStatus> listStatus(JobContext job) throws IOException {
      Path[] files = getInputPaths(job);
      List<FileStatus> results = new ArrayList<FileStatus>();
      for (int i = 0; i < files.length; i++) {
        Path p = files[i];
        FileSystem fs = p.getFileSystem(job.getConfiguration());
        results.add(fs.getFileStatus(p));
      }
      return results;
    }
  }

  /** Dummy class to extend CombineFileInputFormat. It allows
   * testing with files having missing blocks without actually removing replicas.
   */
  public static class MissingBlockFileSystem extends DistributedFileSystem {
    String fileWithMissingBlocks;

    @Override
    public void initialize(URI name, Configuration conf) throws IOException {
      fileWithMissingBlocks = "";
      super.initialize(name, conf);
    }

    @Override
    public BlockLocation[] getFileBlockLocations(
        FileStatus stat, long start, long len) throws IOException {
      if (stat.isDirectory()) {
        return null;
      }
      System.out.println("File " + stat.getPath());
      String name = stat.getPath().toUri().getPath();
      BlockLocation[] locs =
        super.getFileBlockLocations(stat, start, len);
      if (name.equals(fileWithMissingBlocks)) {
        System.out.println("Returning missing blocks for " + fileWithMissingBlocks);
        locs[0] = new HdfsBlockLocation(new BlockLocation(new String[0],
            new String[0], locs[0].getOffset(), locs[0].getLength()), null);
      }
      return locs;
    }

    public void setFileWithMissingBlocks(String f) {
      fileWithMissingBlocks = f;
    }
  }

  private static final String DUMMY_KEY = "dummy.rr.key";

  private static class DummyRecordReader extends RecordReader<Text, Text> {
    private TaskAttemptContext context;
    private CombineFileSplit s;
    private int idx;
    private boolean used;

    public DummyRecordReader(CombineFileSplit split, TaskAttemptContext context,
        Integer i) {
      this.context = context;
      this.idx = i;
      this.s = split;
      this.used = true;
    }

    /** @return a value specified in the context to check whether the
     * context is properly updated by the initialize() method.
     */
    public String getDummyConfVal() {
      return this.context.getConfiguration().get(DUMMY_KEY);
    }

    public void initialize(InputSplit split, TaskAttemptContext context) {
      this.context = context;
      this.s = (CombineFileSplit) split;

      // By setting used to true in the c'tor, but false in initialize,
      // we can check that initialize() is always called before use
      // (e.g., in testReinit()).
      this.used = false;
    }

    public boolean nextKeyValue() {
      boolean ret = !used;
      this.used = true;
      return ret;
    }

    public Text getCurrentKey() {
      return new Text(this.context.getConfiguration().get(DUMMY_KEY));
    }

    public Text getCurrentValue() {
      return new Text(this.s.getPath(idx).toString());
    }

    public float getProgress() {
      return used ? 1.0f : 0.0f;
    }

    public void close() {
    }
  }

  /** Extend CFIF to use CFRR with DummyRecordReader */
  private class ChildRRInputFormat extends CombineFileInputFormat<Text, Text> {
    @SuppressWarnings("unchecked")
    @Override
    public RecordReader<Text,Text> createRecordReader(InputSplit split, 
        TaskAttemptContext context) throws IOException {
      return new CombineFileRecordReader((CombineFileSplit) split, context,
          (Class) DummyRecordReader.class);
    }
  }

  @Test
  public void testRecordReaderInit() throws InterruptedException, IOException {
    // Test that we properly initialize the child recordreader when
    // CombineFileInputFormat and CombineFileRecordReader are used.

    TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
    Configuration conf1 = new Configuration();
    conf1.set(DUMMY_KEY, "STATE1");
    TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId);

    // This will create a CombineFileRecordReader that itself contains a
    // DummyRecordReader.
    InputFormat inputFormat = new ChildRRInputFormat();

    Path [] files = { new Path("file1") };
    long [] lengths = { 1 };

    CombineFileSplit split = new CombineFileSplit(files, lengths);

    RecordReader rr = inputFormat.createRecordReader(split, context1);
    assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);

    // Verify that the initial configuration is the one being used.
    // Right after construction the dummy key should have value "STATE1"
    assertEquals("Invalid initial dummy key value", "STATE1",
      rr.getCurrentKey().toString());

    // Switch the active context for the RecordReader...
    Configuration conf2 = new Configuration();
    conf2.set(DUMMY_KEY, "STATE2");
    TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId);
    rr.initialize(split, context2);

    // And verify that the new context is updated into the child record reader.
    assertEquals("Invalid secondary dummy key value", "STATE2",
      rr.getCurrentKey().toString());
  }

  @Test
  public void testReinit() throws Exception {
    // Test that a split containing multiple files works correctly,
    // with the child RecordReader getting its initialize() method
    // called a second time.
    TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
    Configuration conf = new Configuration();
    TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);

    // This will create a CombineFileRecordReader that itself contains a
    // DummyRecordReader.
    InputFormat inputFormat = new ChildRRInputFormat();

    Path [] files = { new Path("file1"), new Path("file2") };
    long [] lengths = { 1, 1 };

    CombineFileSplit split = new CombineFileSplit(files, lengths);
    RecordReader rr = inputFormat.createRecordReader(split, context);
    assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);

    // first initialize() call comes from MapTask. We'll do it here.
    rr.initialize(split, context);

    // First value is first filename.
    assertTrue(rr.nextKeyValue());
    assertEquals("file1", rr.getCurrentValue().toString());

    // The inner RR will return false, because it only emits one (k, v) pair.
    // But there's another sub-split to process. This returns true to us.
    assertTrue(rr.nextKeyValue());
    
    // And the 2nd rr will have its initialize method called correctly.
    assertEquals("file2", rr.getCurrentValue().toString());
    
    // But after both child RR's have returned their singleton (k, v), this
    // should also return false.
    assertFalse(rr.nextKeyValue());
  }

  /**
   * For testing each split has the expected name, length, and offset.
   */
  private final class Split {
    private String name;
    private long length;
    private long offset;

    public Split(String name, long length, long offset) {
      this.name = name;
      this.length = length;
      this.offset = offset;
    }

    public String getName() {
      return name;
    }

    public long getLength() {
      return length;
    }

    public long getOffset() {
      return offset;
    }

    @Override
    public boolean equals(Object obj) {
      if (obj instanceof Split) {
        Split split = ((Split) obj);
        return split.name.equals(name) && split.length == length
            && split.offset == offset;
      }
      return false;
    }
  }

  /**
   * The test suppresses unchecked warnings in
   * {@link org.mockito.Mockito#reset}. Although calling the method is
   * a bad manner, we call the method instead of splitting the test
   * (i.e. restarting MiniDFSCluster) to save time.
   */
  @Test
  @SuppressWarnings("unchecked")
  public void testSplitPlacement() throws Exception {
    MiniDFSCluster dfs = null;
    FileSystem fileSys = null;
    try {
      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
       * 1) file1 and file5, just after starting the datanode on r1, with 
       *    a repl factor of 1, and,
       * 2) file2, just after starting the datanode on r2, with 
       *    a repl factor of 2, and,
       * 3) file3, file4 after starting the all three datanodes, with a repl 
       *    factor of 3.
       * At the end, file1, file5 will be present on only datanode1, file2 will 
       * be present on datanode 1 and datanode2 and 
       * file3, file4 will be present on all datanodes. 
       */
      Configuration conf = new Configuration();
      conf.setBoolean("dfs.replication.considerLoad", false);
      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
          .build();
      dfs.waitActive();

      fileSys = dfs.getFileSystem();
      if (!fileSys.mkdirs(inDir)) {
        throw new IOException("Mkdirs failed to create " + inDir.toString());
      }
      Path file1 = new Path(dir1 + "/file1");
      writeFile(conf, file1, (short) 1, 1);
      // create another file on the same datanode
      Path file5 = new Path(dir5 + "/file5");
      writeFile(conf, file5, (short) 1, 1);
      // split it using a CombinedFile input format
      DummyInputFormat inFormat = new DummyInputFormat();
      Job job = Job.getInstance(conf);
      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
      List<InputSplit> splits = inFormat.getSplits(job);
      System.out.println("Made splits(Test0): " + splits.size());
      for (InputSplit split : splits) {
        System.out.println("File split(Test0): " + split);
      }
      assertEquals(1, splits.size());
      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
      assertEquals(2, fileSplit.getNumPaths());
      assertEquals(1, fileSplit.getLocations().length);
      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
      assertEquals(0, fileSplit.getOffset(0));
      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
      assertEquals(0, fileSplit.getOffset(1));
      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
      assertEquals(hosts1[0], fileSplit.getLocations()[0]);

      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
      dfs.waitActive();

      // create file on two datanodes.
      Path file2 = new Path(dir2 + "/file2");
      writeFile(conf, file2, (short) 2, 2);

      // split it using a CombinedFile input format
      inFormat = new DummyInputFormat();
      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
      inFormat.setMinSplitSizeRack(BLOCKSIZE);
      splits = inFormat.getSplits(job);
      System.out.println("Made splits(Test1): " + splits.size());

      for (InputSplit split : splits) {
        System.out.println("File split(Test1): " + split);
      }

      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        /**
         * If rack1 is processed first by
         * {@link CombineFileInputFormat#createSplits},
         * create only one split on rack1. Otherwise create two splits.
         */
        if (splits.size() == 2) {
          // first split is on rack2, contains file2
          if (split.equals(splits.get(0))) {
            assertEquals(2, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(file2.getName(), fileSplit.getPath(1).getName());
            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          // second split is on rack1, contains file1
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 1) {
          // first split is on rack1, contains file1 and file2.
          assertEquals(3, fileSplit.getNumPaths());
          Set<Split> expected = new HashSet<>();
          expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
          expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
          expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
          List<Split> actual = new ArrayList<>();
          for (int i = 0; i < 3; i++) {
            String name = fileSplit.getPath(i).getName();
            long length = fileSplit.getLength(i);
            long offset = fileSplit.getOffset(i);
            actual.add(new Split(name, length, offset));
          }
          assertTrue(actual.containsAll(expected));
          assertEquals(1, fileSplit.getLocations().length);
          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
        } else {
          fail("Expected split size is 1 or 2, but actual size is "
              + splits.size());
        }
      }

      // create another file on 3 datanodes and 3 racks.
      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
      dfs.waitActive();
      Path file3 = new Path(dir3 + "/file3");
      writeFile(conf, new Path(dir3 + "/file3"), (short) 3, 3);
      inFormat = new DummyInputFormat();
      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
      inFormat.setMinSplitSizeRack(BLOCKSIZE);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test2): " + split);
      }

      Set<Split> expected = new HashSet<>();
      expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
      expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
      expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
      expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
      expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
      expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
      List<Split> actual = new ArrayList<>();

      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        /**
         * If rack1 is processed first by
         * {@link CombineFileInputFormat#createSplits},
         * create only one split on rack1.
         * If rack2 or rack3 is processed first and rack1 is processed second,
         * create one split on rack2 or rack3 and the other split is on rack1.
         * Otherwise create 3 splits for each rack.
         */
        if (splits.size() == 3) {
          // first split is on rack3, contains file3
          if (split.equals(splits.get(0))) {
            assertEquals(3, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file3.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(file3.getName(), fileSplit.getPath(1).getName());
            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
            assertEquals(file3.getName(), fileSplit.getPath(2).getName());
            assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
            assertEquals(BLOCKSIZE, fileSplit.getLength(2));
            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
          }
          // second split is on rack2, contains file2
          if (split.equals(splits.get(1))) {
            assertEquals(2, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(file2.getName(), fileSplit.getPath(1).getName());
            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          // third split is on rack1, contains file1
          if (split.equals(splits.get(2))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 2) {
          // first split is on rack2 or rack3, contains one or two files.
          if (split.equals(splits.get(0))) {
            assertEquals(1, fileSplit.getLocations().length);
            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
              assertEquals(2, fileSplit.getNumPaths());
            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
              assertEquals(3, fileSplit.getNumPaths());
            } else {
              fail("First split should be on rack2 or rack3.");
            }
          }
          // second split is on rack1, contains the rest files.
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 1) {
          // first split is rack1, contains all three files.
          assertEquals(1, fileSplit.getLocations().length);
          assertEquals(6, fileSplit.getNumPaths());
          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
        } else {
          fail("Split size should be 1, 2, or 3.");
        }
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
      }

      assertEquals(6, actual.size());
      assertTrue(actual.containsAll(expected));

      // create file4 on all three racks
      Path file4 = new Path(dir4 + "/file4");
      writeFile(conf, file4, (short)3, 3);
      inFormat = new DummyInputFormat();
      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      inFormat.setMinSplitSizeRack(BLOCKSIZE);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test3): " + split);
      }

      expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
      expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
      expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
      actual.clear();

      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        /**
         * If rack1 is processed first by
         * {@link CombineFileInputFormat#createSplits},
         * create only one split on rack1.
         * If rack2 or rack3 is processed first and rack1 is processed second,
         * create one split on rack2 or rack3 and the other split is on rack1.
         * Otherwise create 3 splits for each rack.
         */
        if (splits.size() == 3) {
          // first split is on rack3, contains file3 and file4
          if (split.equals(splits.get(0))) {
            assertEquals(6, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
          }
          // second split is on rack2, contains file2
          if (split.equals(splits.get(1))) {
            assertEquals(2, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(file2.getName(), fileSplit.getPath(1).getName());
            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          // third split is on rack1, contains file1
          if (split.equals(splits.get(2))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 2) {
          // first split is on rack2 or rack3, contains two or three files.
          if (split.equals(splits.get(0))) {
            assertEquals(1, fileSplit.getLocations().length);
            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
              assertEquals(5, fileSplit.getNumPaths());
            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
              assertEquals(6, fileSplit.getNumPaths());
            } else {
              fail("First split should be on rack2 or rack3.");
            }
          }
          // second split is on rack1, contains the rest files.
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 1) {
          // first split is rack1, contains all four files.
          assertEquals(1, fileSplit.getLocations().length);
          assertEquals(9, fileSplit.getNumPaths());
          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
        } else {
          fail("Split size should be 1, 2, or 3.");
        }
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
      }

      assertEquals(9, actual.size());
      assertTrue(actual.containsAll(expected));

      // maximum split size is 2 blocks 
      inFormat = new DummyInputFormat();
      inFormat.setMinSplitSizeNode(BLOCKSIZE);
      inFormat.setMaxSplitSize(2*BLOCKSIZE);
      FileInputFormat.setInputPaths(job, 
        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test4): " + split);
      }
      assertEquals(5, splits.size());

      actual.clear();
      reset(mockList);
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
        mockList.add(fileSplit.getLocations()[0]);
      }

      assertEquals(9, actual.size());
      assertTrue(actual.containsAll(expected));
      // verify the splits are on all the racks
      verify(mockList, atLeastOnce()).add(hosts1[0]);
      verify(mockList, atLeastOnce()).add(hosts2[0]);
      verify(mockList, atLeastOnce()).add(hosts3[0]);

      // maximum split size is 3 blocks 
      inFormat = new DummyInputFormat();
      inFormat.setMinSplitSizeNode(BLOCKSIZE);
      inFormat.setMaxSplitSize(3*BLOCKSIZE);
      FileInputFormat.setInputPaths(job, 
        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test5): " + split);
      }

      assertEquals(3, splits.size());

      actual.clear();
      reset(mockList);
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
        mockList.add(fileSplit.getLocations()[0]);
      }

      assertEquals(9, actual.size());
      assertTrue(actual.containsAll(expected));
      verify(mockList, atLeastOnce()).add(hosts1[0]);
      verify(mockList, atLeastOnce()).add(hosts2[0]);

      // maximum split size is 4 blocks 
      inFormat = new DummyInputFormat();
      inFormat.setMaxSplitSize(4*BLOCKSIZE);
      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test6): " + split);
      }
      assertEquals(3, splits.size());

      actual.clear();
      reset(mockList);
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
        mockList.add(fileSplit.getLocations()[0]);
      }

      assertEquals(9, actual.size());
      assertTrue(actual.containsAll(expected));
      verify(mockList, atLeastOnce()).add(hosts1[0]);

      // maximum split size is 7 blocks and min is 3 blocks
      inFormat = new DummyInputFormat();
      inFormat.setMaxSplitSize(7*BLOCKSIZE);
      inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
      inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
      FileInputFormat.setInputPaths(job, 
        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test7): " + split);
      }

      assertEquals(2, splits.size());

      actual.clear();
      reset(mockList);
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
        mockList.add(fileSplit.getLocations()[0]);
      }

      assertEquals(9, actual.size());
      assertTrue(actual.containsAll(expected));
      verify(mockList, atLeastOnce()).add(hosts1[0]);

      // Rack 1 has file1, file2 and file3 and file4
      // Rack 2 has file2 and file3 and file4
      // Rack 3 has file3 and file4
      // setup a filter so that only (file1 and file2) or (file3 and file4)
      // can be combined
      inFormat = new DummyInputFormat();
      FileInputFormat.addInputPath(job, inDir);
      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
      inFormat.createPool(new TestFilter(dir1), 
                          new TestFilter(dir2));
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test1): " + split);
      }

      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        if (splits.size() == 2) {
          // first split is on rack1, contains file1 and file2.
          if (split.equals(splits.get(0))) {
            assertEquals(3, fileSplit.getNumPaths());
            expected.clear();
            expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
            expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
            expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
            actual.clear();
            for (int i = 0; i < 3; i++) {
              String name = fileSplit.getPath(i).getName();
              long length = fileSplit.getLength(i);
              long offset = fileSplit.getOffset(i);
              actual.add(new Split(name, length, offset));
            }
            assertTrue(actual.containsAll(expected));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
          if (split.equals(splits.get(1))) {
            // second split contains the file3 and file4, however,
            // the locations is undetermined.
            assertEquals(6, fileSplit.getNumPaths());
            expected.clear();
            expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
            expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
            actual.clear();
            for (int i = 0; i < 6; i++) {
              String name = fileSplit.getPath(i).getName();
              long length = fileSplit.getLength(i);
              long offset = fileSplit.getOffset(i);
              actual.add(new Split(name, length, offset));
            }
            assertTrue(actual.containsAll(expected));
            assertEquals(1, fileSplit.getLocations().length);
          }
        } else if (splits.size() == 3) {
          if (split.equals(splits.get(0))) {
            // first split is on rack2, contains file2
            assertEquals(2, fileSplit.getNumPaths());
            expected.clear();
            expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
            expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
            actual.clear();
            for (int i = 0; i < 2; i++) {
              String name = fileSplit.getPath(i).getName();
              long length = fileSplit.getLength(i);
              long offset = fileSplit.getOffset(i);
              actual.add(new Split(name, length, offset));
            }
            assertTrue(actual.containsAll(expected));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          if (split.equals(splits.get(1))) {
            // second split is on rack1, contains file1
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
          if (split.equals(splits.get(2))) {
            // third split contains file3 and file4, however,
            // the locations is undetermined.
            assertEquals(6, fileSplit.getNumPaths());
            expected.clear();
            expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
            expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
            actual.clear();
            for (int i = 0; i < 6; i++) {
              String name = fileSplit.getPath(i).getName();
              long length = fileSplit.getLength(i);
              long offset = fileSplit.getOffset(i);
              actual.add(new Split(name, length, offset));
            }
            assertTrue(actual.containsAll(expected));
            assertEquals(1, fileSplit.getLocations().length);
          }
        } else {
          fail("Split size should be 2 or 3.");
        }
      }

      // measure performance when there are multiple pools and
      // many files in each pool.
      int numPools = 100;
      int numFiles = 1000;
      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
      for (int i = 0; i < numFiles; i++) {
        FileInputFormat.setInputPaths(job, file1);
      }
      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
      final Path dirNoMatch2 = new Path(inDir, "/diryy");
      for (int i = 0; i < numPools; i++) {
        inFormat1.createPool(new TestFilter(dirNoMatch1), 
                            new TestFilter(dirNoMatch2));
      }
      long start = System.currentTimeMillis();
      splits = inFormat1.getSplits(job);
      long end = System.currentTimeMillis();
      System.out.println("Elapsed time for " + numPools + " pools " +
                         " and " + numFiles + " files is " + 
                         ((end - start)/1000) + " seconds.");

      // This file has three whole blocks. If the maxsplit size is
      // half the block size, then there should be six splits.
      inFormat = new DummyInputFormat();
      inFormat.setMaxSplitSize(BLOCKSIZE/2);
      FileInputFormat.setInputPaths(job, dir3);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test8): " + split);
      }
      assertEquals(splits.size(), 6);

    } finally {
      if (dfs != null) {
        dfs.shutdown();
      }
    }
  }

  static void writeFile(Configuration conf, Path name,
                        short replication, int numBlocks)
      throws IOException, TimeoutException, InterruptedException {
    FileSystem fileSys = FileSystem.get(conf);

    FSDataOutputStream stm = fileSys.create(name, true,
                                            conf.getInt("io.file.buffer.size", 4096),
                                            replication, (long)BLOCKSIZE);
    writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
  }

  // Creates the gzip file and return the FileStatus
  static FileStatus writeGzipFile(Configuration conf, Path name,
      short replication, int numBlocks)
      throws IOException, TimeoutException, InterruptedException {
    FileSystem fileSys = FileSystem.get(conf);

    GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
        .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
    writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
    return fileSys.getFileStatus(name);
  }

  private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
        OutputStream out, short replication, int numBlocks)
      throws IOException, TimeoutException, InterruptedException {
    for (int i = 0; i < numBlocks; i++) {
      out.write(databuf);
    }
    out.close();
    DFSTestUtil.waitReplication(fileSys, name, replication);
  }

  @Test
  public void testNodeDistribution() throws IOException, InterruptedException {
    DummyInputFormat inFormat = new DummyInputFormat();
    int numBlocks = 60;
    long totLength = 0;
    long blockSize = 100;
    int numNodes = 10;

    long minSizeNode = 50;
    long minSizeRack = 50;
    int maxSplitSize = 200; // 4 blocks per split.

    String[] locations = new String[numNodes];
    for (int i = 0; i < numNodes; i++) {
      locations[i] = "h" + i;
    }
    String[] racks = new String[0];
    Path path = new Path("hdfs://file");

    OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];

    int hostCountBase = 0;
    // Generate block list. Replication 3 per block.
    for (int i = 0; i < numBlocks; i++) {
      int localHostCount = hostCountBase;
      String[] blockHosts = new String[3];
      for (int j = 0; j < 3; j++) {
        int hostNum = localHostCount % numNodes;
        blockHosts[j] = "h" + hostNum;
        localHostCount++;
      }
      hostCountBase++;
      blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts,
          racks);
      totLength += blockSize;
    }

    List<InputSplit> splits = new ArrayList<InputSplit>();
    HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
    HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
    HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
    Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>();

    OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
        nodeToBlocks, rackToNodes);
    
    inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
        maxSplitSize, minSizeNode, minSizeRack, splits);

    int expectedSplitCount = (int) (totLength / maxSplitSize);
    assertEquals(expectedSplitCount, splits.size());

    // Ensure 90+% of the splits have node local blocks.
    // 100% locality may not always be achieved.
    int numLocalSplits = 0;
    for (InputSplit inputSplit : splits) {
      assertEquals(maxSplitSize, inputSplit.getLength());
      if (inputSplit.getLocations().length == 1) {
        numLocalSplits++;
      }
    }
    assertTrue(numLocalSplits >= 0.9 * splits.size());
  }

  @Test
  public void testNodeInputSplit() throws IOException, InterruptedException {
    // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on 
    // both nodes. The grouping ensures that both nodes get splits instead of 
    // just the first node
    DummyInputFormat inFormat = new DummyInputFormat();
    int numBlocks = 12;
    long totLength = 0;
    long blockSize = 100;
    long maxSize = 200;
    long minSizeNode = 50;
    long minSizeRack = 50;
    String[] locations = { "h1", "h2" };
    String[] racks = new String[0];
    Path path = new Path("hdfs://file");
    
    OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
    for(int i=0; i<numBlocks; ++i) {
      blocks[i] = new OneBlockInfo(path, i*blockSize, blockSize, locations, racks);
      totLength += blockSize;
    }
    
    List<InputSplit> splits = new ArrayList<InputSplit>();
    HashMap<String, Set<String>> rackToNodes = 
                              new HashMap<String, Set<String>>();
    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
                              new HashMap<String, List<OneBlockInfo>>();
    HashMap<OneBlockInfo, String[]> blockToNodes = 
                              new HashMap<OneBlockInfo, String[]>();
    HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
                              new HashMap<String, Set<OneBlockInfo>>();
    
    OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
                             nodeToBlocks, rackToNodes);
    
    inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,  
                          maxSize, minSizeNode, minSizeRack, splits);
    
    int expectedSplitCount = (int)(totLength/maxSize);
    assertEquals(expectedSplitCount, splits.size());
    HashMultiset<String> nodeSplits = HashMultiset.create();
    for(int i=0; i<expectedSplitCount; ++i) {
      InputSplit inSplit = splits.get(i);
      assertEquals(maxSize, inSplit.getLength());
      assertEquals(1, inSplit.getLocations().length);
      nodeSplits.add(inSplit.getLocations()[0]);
    }
    assertEquals(3, nodeSplits.count(locations[0]));
    assertEquals(3, nodeSplits.count(locations[1]));
  }

  /**
   * The test suppresses unchecked warnings in
   * {@link org.mockito.Mockito#reset}. Although calling the method is
   * a bad manner, we call the method instead of splitting the test
   * (i.e. restarting MiniDFSCluster) to save time.
   */
  @Test
  @SuppressWarnings("unchecked")
  public void testSplitPlacementForCompressedFiles() throws Exception {
    MiniDFSCluster dfs = null;
    FileSystem fileSys = null;
    try {
      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
       *  files
       * 1) file1 and file5, just after starting the datanode on r1, with 
       *    a repl factor of 1, and,
       * 2) file2, just after starting the datanode on r2, with 
       *    a repl factor of 2, and,
       * 3) file3, file4 after starting the all three datanodes, with a repl 
       *    factor of 3.
       * At the end, file1, file5 will be present on only datanode1, file2 will 
       * be present on datanode 1 and datanode2 and 
       * file3, file4 will be present on all datanodes. 
       */
      Configuration conf = new Configuration();
      conf.setBoolean("dfs.replication.considerLoad", false);
      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
          .build();
      dfs.waitActive();

      fileSys = dfs.getFileSystem();
      if (!fileSys.mkdirs(inDir)) {
        throw new IOException("Mkdirs failed to create " + inDir.toString());
      }
      Path file1 = new Path(dir1 + "/file1.gz");
      FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
      // create another file on the same datanode
      Path file5 = new Path(dir5 + "/file5.gz");
      FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
      // split it using a CombinedFile input format
      DummyInputFormat inFormat = new DummyInputFormat();
      Job job = Job.getInstance(conf);
      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
      List<InputSplit> splits = inFormat.getSplits(job);
      System.out.println("Made splits(Test0): " + splits.size());
      for (InputSplit split : splits) {
        System.out.println("File split(Test0): " + split);
      }
      assertEquals(1, splits.size());
      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
      assertEquals(2, fileSplit.getNumPaths());
      assertEquals(1, fileSplit.getLocations().length);
      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
      assertEquals(0, fileSplit.getOffset(0));
      assertEquals(f1.getLen(), fileSplit.getLength(0));
      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
      assertEquals(0, fileSplit.getOffset(1));
      assertEquals(f5.getLen(), fileSplit.getLength(1));
      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
      
      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
      dfs.waitActive();

      // create file on two datanodes.
      Path file2 = new Path(dir2 + "/file2.gz");
      FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);

      // split it using a CombinedFile input format
      inFormat = new DummyInputFormat();
      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
      inFormat.setMinSplitSizeRack(f1.getLen());
      splits = inFormat.getSplits(job);
      System.out.println("Made splits(Test1): " + splits.size());

      // make sure that each split has different locations
      for (InputSplit split : splits) {
        System.out.println("File split(Test1): " + split);
      }

      Set<Split> expected = new HashSet<>();
      expected.add(new Split(file1.getName(), f1.getLen(), 0));
      expected.add(new Split(file2.getName(), f2.getLen(), 0));
      List<Split> actual = new ArrayList<>();

      /**
       * If rack1 is processed first by
       * {@link CombineFileInputFormat#createSplits},
       * create only one split on rack1. Otherwise create two splits.
       */
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        if (splits.size() == 2) {
          if (split.equals(splits.get(0))) {
            // first split is on rack2, contains file2.
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(f2.getLen(), fileSplit.getLength(0));
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          if (split.equals(splits.get(1))) {
            // second split is on rack1, contains file1.
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(f1.getLen(), fileSplit.getLength(0));
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 1) {
          // first split is on rack1, contains file1 and file2.
          assertEquals(2, fileSplit.getNumPaths());
          assertEquals(1, fileSplit.getLocations().length);
          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
        } else {
          fail("Split size should be 1 or 2.");
        }
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
      }
      assertEquals(2, actual.size());
      assertTrue(actual.containsAll(expected));

      // create another file on 3 datanodes and 3 racks.
      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
      dfs.waitActive();
      Path file3 = new Path(dir3 + "/file3.gz");
      FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
      inFormat = new DummyInputFormat();
      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
      inFormat.setMinSplitSizeRack(f1.getLen());
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test2): " + split);
      }

      expected.add(new Split(file3.getName(), f3.getLen(), 0));
      actual.clear();

      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        /**
         * If rack1 is processed first by
         * {@link CombineFileInputFormat#createSplits},
         * create only one split on rack1.
         * If rack2 or rack3 is processed first and rack1 is processed second,
         * create one split on rack2 or rack3 and the other split is on rack1.
         * Otherwise create 3 splits for each rack.
         */
        if (splits.size() == 3) {
          // first split is on rack3, contains file3
          if (split.equals(splits.get(0))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(file3.getName(), fileSplit.getPath(0).getName());
            assertEquals(f3.getLen(), fileSplit.getLength(0));
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
          }
          // second split is on rack2, contains file2
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
            assertEquals(f2.getLen(), fileSplit.getLength(0));
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          // third split is on rack1, contains file1
          if (split.equals(splits.get(2))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
            assertEquals(f1.getLen(), fileSplit.getLength(0));
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 2) {
          // first split is on rack2 or rack3, contains one or two files.
          if (split.equals(splits.get(0))) {
            assertEquals(1, fileSplit.getLocations().length);
            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
              assertEquals(2, fileSplit.getNumPaths());
            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
              assertEquals(1, fileSplit.getNumPaths());
            } else {
              fail("First split should be on rack2 or rack3.");
            }
          }
          // second split is on rack1, contains the rest files.
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 1) {
          // first split is rack1, contains all three files.
          assertEquals(1, fileSplit.getLocations().length);
          assertEquals(3, fileSplit.getNumPaths());
          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
        } else {
          fail("Split size should be 1, 2, or 3.");
        }
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
      }

      assertEquals(3, actual.size());
      assertTrue(actual.containsAll(expected));

      // create file4 on all three racks
      Path file4 = new Path(dir4 + "/file4.gz");
      FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
      inFormat = new DummyInputFormat();
      FileInputFormat.setInputPaths(job,
          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      inFormat.setMinSplitSizeRack(f1.getLen());
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test3): " + split);
      }

      expected.add(new Split(file3.getName(), f3.getLen(), 0));
      actual.clear();

      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        /**
         * If rack1 is processed first by
         * {@link CombineFileInputFormat#createSplits},
         * create only one split on rack1.
         * If rack2 or rack3 is processed first and rack1 is processed second,
         * create one split on rack2 or rack3 and the other split is on rack1.
         * Otherwise create 3 splits for each rack.
         */
        if (splits.size() == 3) {
          // first split is on rack3, contains file3 and file4
          if (split.equals(splits.get(0))) {
            assertEquals(2, fileSplit.getNumPaths());
            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
          }
          // second split is on rack2, contains file2
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
            assertEquals(f2.getLen(), fileSplit.getLength(0));
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          // third split is on rack1, contains file1
          if (split.equals(splits.get(2))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
            assertEquals(f1.getLen(), fileSplit.getLength(0));
            assertEquals(0, fileSplit.getOffset(0));
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 2) {
          // first split is on rack2 or rack3, contains two or three files.
          if (split.equals(splits.get(0))) {
            assertEquals(1, fileSplit.getLocations().length);
            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
              assertEquals(3, fileSplit.getNumPaths());
            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
              assertEquals(2, fileSplit.getNumPaths());
            } else {
              fail("First split should be on rack2 or rack3.");
            }
          }
          // second split is on rack1, contains the rest files.
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 1) {
          // first split is rack1, contains all four files.
          assertEquals(1, fileSplit.getLocations().length);
          assertEquals(4, fileSplit.getNumPaths());
          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
        } else {
          fail("Split size should be 1, 2, or 3.");
        }
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
      }

      assertEquals(4, actual.size());
      assertTrue(actual.containsAll(expected));

      // maximum split size is file1's length
      inFormat = new DummyInputFormat();
      inFormat.setMinSplitSizeNode(f1.getLen());
      inFormat.setMaxSplitSize(f1.getLen());
      FileInputFormat.setInputPaths(job, 
        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test4): " + split);
      }
      assertEquals(4, splits.size());

      actual.clear();
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
        mockList.add(fileSplit.getLocations()[0]);
      }

      assertEquals(4, actual.size());
      assertTrue(actual.containsAll(expected));
      verify(mockList, atLeastOnce()).add(hosts1[0]);
      verify(mockList, atLeastOnce()).add(hosts2[0]);
      verify(mockList, atLeastOnce()).add(hosts3[0]);

      // maximum split size is twice file1's length
      inFormat = new DummyInputFormat();
      inFormat.setMinSplitSizeNode(f1.getLen());
      inFormat.setMaxSplitSize(2 * f1.getLen());
      FileInputFormat.setInputPaths(job, 
        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test5): " + split);
      }

      actual.clear();
      reset(mockList);
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
        mockList.add(fileSplit.getLocations()[0]);
      }
      assertEquals(4, actual.size());
      assertTrue(actual.containsAll(expected));

      if (splits.size() == 3) {
        // splits are on all the racks
        verify(mockList, times(1)).add(hosts1[0]);
        verify(mockList, times(1)).add(hosts2[0]);
        verify(mockList, times(1)).add(hosts3[0]);
      } else if (splits.size() == 2) {
        // one split is on rack1, another split is on rack2 or rack3
        verify(mockList, times(1)).add(hosts1[0]);
      } else {
        fail("Split size should be 2 or 3.");
      }

      // maximum split size is 4 times file1's length 
      inFormat = new DummyInputFormat();
      inFormat.setMinSplitSizeNode(2 * f1.getLen());
      inFormat.setMaxSplitSize(4 * f1.getLen());
      FileInputFormat.setInputPaths(job,
          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test6): " + split);
      }

      /**
       * If rack1 is processed first by
       * {@link CombineFileInputFormat#createSplits},
       * create only one split on rack1. Otherwise create two splits.
       */
      assertTrue("Split size should be 1 or 2.",
          splits.size() == 1 || splits.size() == 2);
      actual.clear();
      reset(mockList);
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
        mockList.add(fileSplit.getLocations()[0]);
      }
      assertEquals(4, actual.size());
      assertTrue(actual.containsAll(expected));
      verify(mockList, times(1)).add(hosts1[0]);

      // maximum split size and min-split-size per rack is 4 times file1's length
      inFormat = new DummyInputFormat();
      inFormat.setMaxSplitSize(4 * f1.getLen());
      inFormat.setMinSplitSizeRack(4 * f1.getLen());
      FileInputFormat.setInputPaths(job, 
        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test7): " + split);
      }
      assertEquals(1, splits.size());
      fileSplit = (CombineFileSplit) splits.get(0);
      assertEquals(4, fileSplit.getNumPaths());
      assertEquals(1, fileSplit.getLocations().length);
      assertEquals(hosts1[0], fileSplit.getLocations()[0]);

      // minimum split size per node is 4 times file1's length
      inFormat = new DummyInputFormat();
      inFormat.setMinSplitSizeNode(4 * f1.getLen());
      FileInputFormat.setInputPaths(job, 
        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test8): " + split);
      }
      assertEquals(1, splits.size());
      fileSplit = (CombineFileSplit) splits.get(0);
      assertEquals(4, fileSplit.getNumPaths());
      assertEquals(1, fileSplit.getLocations().length);
      assertEquals(hosts1[0], fileSplit.getLocations()[0]);

      // Rack 1 has file1, file2 and file3 and file4
      // Rack 2 has file2 and file3 and file4
      // Rack 3 has file3 and file4
      // setup a filter so that only file1 and file2 can be combined
      inFormat = new DummyInputFormat();
      FileInputFormat.addInputPath(job, inDir);
      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
      inFormat.createPool(new TestFilter(dir1),
          new TestFilter(dir2));
      splits = inFormat.getSplits(job);
      for (InputSplit split : splits) {
        System.out.println("File split(Test9): " + split);
      }

      actual.clear();
      for (InputSplit split : splits) {
        fileSplit = (CombineFileSplit) split;
        if (splits.size() == 3) {
          // If rack2 is processed first
          if (split.equals(splits.get(0))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
          }
          if (split.equals(splits.get(1))) {
            assertEquals(1, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
          if (split.equals(splits.get(2))) {
            assertEquals(2, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
          }
        } else if (splits.size() == 2) {
          // If rack1 is processed first
          if (split.equals(splits.get(0))) {
            assertEquals(2, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
          }
          if (split.equals(splits.get(1))) {
            assertEquals(2, fileSplit.getNumPaths());
            assertEquals(1, fileSplit.getLocations().length);
            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
          }
        } else {
          fail("Split size should be 2 or 3.");
        }
        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
          String name = fileSplit.getPath(i).getName();
          long length = fileSplit.getLength(i);
          long offset = fileSplit.getOffset(i);
          actual.add(new Split(name, length, offset));
        }
      }
      assertEquals(4, actual.size());
      assertTrue(actual.containsAll(expected));

      // measure performance when there are multiple pools and
      // many files in each pool.
      int numPools = 100;
      int numFiles = 1000;
      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
      for (int i = 0; i < numFiles; i++) {
        FileInputFormat.setInputPaths(job, file1);
      }
      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
      final Path dirNoMatch2 = new Path(inDir, "/diryy");
      for (int i = 0; i < numPools; i++) {
        inFormat1.createPool(new TestFilter(dirNoMatch1), 
                            new TestFilter(dirNoMatch2));
      }
      long start = System.currentTimeMillis();
      splits = inFormat1.getSplits(job);
      long end = System.currentTimeMillis();
      System.out.println("Elapsed time for " + numPools + " pools " +
                         " and " + numFiles + " files is " + 
                         ((end - start)) + " milli seconds.");
    } finally {
      if (dfs != null) {
        dfs.shutdown();
      }
    }
  }

  /**
   * Test that CFIF can handle missing blocks.
   */
  @Test
  public void testMissingBlocks() throws Exception {
    final Configuration conf = new Configuration();
    conf.set("fs.hdfs.impl", MissingBlockFileSystem.class.getName());
    conf.setBoolean("dfs.replication.considerLoad", false);
    try (MiniDFSCluster dfs = new MiniDFSCluster.Builder(conf)
        .racks(rack1).hosts(hosts1).build()) {
      dfs.waitActive();

      final FileSystem fileSys =
          MissingBlockFileSystem.newInstance(dfs.getURI(), conf);
      if (!fileSys.mkdirs(inDir)) {
        throw new IOException("Mkdirs failed to create " + inDir.toString());
      }

      Path file1 = new Path(dir1 + "/file1");
      writeFile(conf, file1, (short)1, 1);
      // create another file on the same datanode
      Path file5 = new Path(dir5 + "/file5");
      writeFile(conf, file5, (short)1, 1);

      ((MissingBlockFileSystem)fileSys).setFileWithMissingBlocks(file1.toUri().getPath());
      // split it using a CombinedFile input format
      DummyInputFormat inFormat = new DummyInputFormat();
      Job job = Job.getInstance(conf);
      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
      List<InputSplit> splits = inFormat.getSplits(job);
      System.out.println("Made splits(Test0): " + splits.size());
      for (InputSplit split : splits) {
        System.out.println("File split(Test0): " + split);
      }
      assertThat(splits.size()).isEqualTo(1);
      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
      assertEquals(2, fileSplit.getNumPaths());
      assertEquals(1, fileSplit.getLocations().length);
      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
      assertEquals(0, fileSplit.getOffset(0));
      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
      assertEquals(0, fileSplit.getOffset(1));
      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
    }
  }
  
  /**
   * Test when the input file's length is 0.
   */
  @Test
  public void testForEmptyFile() throws Exception {
    Configuration conf = new Configuration();
    FileSystem fileSys = FileSystem.get(conf);
    Path file = new Path("test" + "/file");
    FSDataOutputStream out = fileSys.create(file, true,
        conf.getInt("io.file.buffer.size", 4096), (short) 1, (long) BLOCKSIZE);
    out.write(new byte[0]);
    out.close();

    // split it using a CombinedFile input format
    DummyInputFormat inFormat = new DummyInputFormat();
    Job job = Job.getInstance(conf);
    FileInputFormat.setInputPaths(job, "test");
    List<InputSplit> splits = inFormat.getSplits(job);
    assertEquals(1, splits.size());
    CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
    assertEquals(1, fileSplit.getNumPaths());
    assertEquals(file.getName(), fileSplit.getPath(0).getName());
    assertEquals(0, fileSplit.getOffset(0));
    assertEquals(0, fileSplit.getLength(0));

    fileSys.delete(file.getParent(), true);
  }

  /**
   * Test that directories do not get included as part of getSplits()
   */
  @Test
  public void testGetSplitsWithDirectory() throws Exception {
    MiniDFSCluster dfs = null;
    try {
      Configuration conf = new Configuration();
      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
          .build();
      dfs.waitActive();

      FileSystem fileSys = dfs.getFileSystem();

      // Set up the following directory structure:
      // /dir1/: directory
      // /dir1/file: regular file
      // /dir1/dir2/: directory
      Path dir1 = new Path("/dir1");
      Path file = new Path("/dir1/file1");
      Path dir2 = new Path("/dir1/dir2");
      if (!fileSys.mkdirs(dir1)) {
        throw new IOException("Mkdirs failed to create " + dir1.toString());
      }
      FSDataOutputStream out = fileSys.create(file);
      out.write(new byte[0]);
      out.close();
      if (!fileSys.mkdirs(dir2)) {
        throw new IOException("Mkdirs failed to create " + dir2.toString());
      }

      // split it using a CombinedFile input format
      DummyInputFormat inFormat = new DummyInputFormat();
      Job job = Job.getInstance(conf);
      FileInputFormat.setInputPaths(job, "/dir1");
      List<InputSplit> splits = inFormat.getSplits(job);

      // directories should be omitted from getSplits() - we should only see file1 and not dir2
      assertEquals(1, splits.size());
      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
      assertEquals(1, fileSplit.getNumPaths());
      assertEquals(file.getName(), fileSplit.getPath(0).getName());
      assertEquals(0, fileSplit.getOffset(0));
      assertEquals(0, fileSplit.getLength(0));
    } finally {
      if (dfs != null) {
        dfs.shutdown();
      }
    }
  }

  /**
   * Test when input files are from non-default file systems
   */
  @Test
  public void testForNonDefaultFileSystem() throws Throwable {
    Configuration conf = new Configuration();

    // use a fake file system scheme as default
    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);

    // default fs path
    assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
    // add a local file
    String localPathRoot = System.getProperty("test.build.data",
        "build/test/data");
    Path localPath = new Path(localPathRoot, "testFile1");
    FileSystem lfs = FileSystem.getLocal(conf);
    FSDataOutputStream dos = lfs.create(localPath);
    dos.writeChars("Local file for CFIF");
    dos.close();

    Job job = Job.getInstance(conf);
    FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
    DummyInputFormat inFormat = new DummyInputFormat();
    List<InputSplit> splits = inFormat.getSplits(job);
    assertTrue(splits.size() > 0);
    for (InputSplit s : splits) {
      CombineFileSplit cfs = (CombineFileSplit)s;
      for (Path p : cfs.getPaths()) {
        assertThat(p.toUri().getScheme()).isEqualTo("file");
      }
    }
  }

  static class TestFilter implements PathFilter {
    private Path p;

    // store a path prefix in this TestFilter
    public TestFilter(Path p) {
      this.p = p;
    }

    // returns true if the specified path matches the prefix stored
    // in this TestFilter.
    public boolean accept(Path path) {
      if (path.toUri().getPath().indexOf(p.toString()) == 0) {
        return true;
      }
      return false;
    }

    public String toString() {
      return "PathFilter:" + p;
    }
  }

  /*
   * Prints out the input splits for the specified files
   */
  private void splitRealFiles(String[] args) throws IOException {
    Configuration conf = new Configuration();
    Job job = Job.getInstance();
    FileSystem fs = FileSystem.get(conf);
    if (!(fs instanceof DistributedFileSystem)) {
      throw new IOException("Wrong file system: " + fs.getClass().getName());
    }
    long blockSize = fs.getDefaultBlockSize();

    DummyInputFormat inFormat = new DummyInputFormat();
    for (int i = 0; i < args.length; i++) {
      FileInputFormat.addInputPaths(job, args[i]);
    }
    inFormat.setMinSplitSizeRack(blockSize);
    inFormat.setMaxSplitSize(10 * blockSize);

    List<InputSplit> splits = inFormat.getSplits(job);
    System.out.println("Total number of splits " + splits.size());
    for (int i = 0; i < splits.size(); ++i) {
      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(i);
      System.out.println("Split[" + i + "] " + fileSplit);
    }
  }

  public static void main(String[] args) throws Exception{

    // if there are some parameters specified, then use those paths
    if (args.length != 0) {
      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
      test.splitRealFiles(args);
    } else {
      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
      test.testSplitPlacement();
    }
  }
}