TestJobSplitWriterWithEC.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.split;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;


/**
 * Tests that maxBlockLocations default value is sufficient for RS-10-4.
 */
public class TestJobSplitWriterWithEC {
  // This will ensure 14 block locations
  private ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies
      .getByID(SystemErasureCodingPolicies.RS_10_4_POLICY_ID);
  private static final int BLOCKSIZE = 1024 * 1024 * 10;

  private MiniDFSCluster cluster;
  private DistributedFileSystem fs;
  private Configuration conf;
  private Path submitDir;
  private Path testFile;

  @BeforeEach
  public void setup() throws Exception {
    Configuration hdfsConf = new HdfsConfiguration();
    hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
    String namenodeDir = new File(MiniDFSCluster.getBaseDirectory(), "name").
        getAbsolutePath();
    hdfsConf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, namenodeDir);
    hdfsConf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeDir);
    hdfsConf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
    cluster = new MiniDFSCluster.Builder(hdfsConf).numDataNodes(15).build();
    fs = cluster.getFileSystem();
    fs.enableErasureCodingPolicy(ecPolicy.getName());
    fs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName());
    cluster.waitActive();

    conf = new Configuration();
    submitDir = new Path("/");
    testFile = new Path("/testfile");
    DFSTestUtil.writeFile(fs, testFile,
        StripedFileTestUtil.generateBytes(BLOCKSIZE));
    conf.set(FileInputFormat.INPUT_DIR,
        fs.getUri().toString() + testFile.toString());
  }

  @AfterEach
  public void after() {
    cluster.close();
  }

  @Test
  public void testMaxBlockLocationsNewSplitsWithErasureCoding()
      throws Exception {
    Job job = Job.getInstance(conf);
    final FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
    final List<InputSplit> splits = fileInputFormat.getSplits(job);

    JobSplitWriter.createSplitFiles(submitDir, conf, fs, splits);

    validateSplitMetaInfo();
  }

  @Test
  public void testMaxBlockLocationsOldSplitsWithErasureCoding()
      throws Exception {
    JobConf jobConf = new JobConf(conf);
    org.apache.hadoop.mapred.TextInputFormat fileInputFormat
        = new org.apache.hadoop.mapred.TextInputFormat();
    fileInputFormat.configure(jobConf);
    final org.apache.hadoop.mapred.InputSplit[] splits =
        fileInputFormat.getSplits(jobConf, 1);

    JobSplitWriter.createSplitFiles(submitDir, conf, fs, splits);

    validateSplitMetaInfo();
  }

  private void validateSplitMetaInfo() throws IOException {
    JobSplit.TaskSplitMetaInfo[] splitInfo =
        SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
            submitDir);

    assertEquals(1, splitInfo.length, "Number of splits");
    assertEquals(14, splitInfo[0].getLocations().length, "Number of block locations");
  }
}