ThreadedMapBenchmark.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.File;
import java.util.Random;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Distributed threaded map benchmark.
 * <p>
 * This benchmark generates random data per map and tests the performance 
 * of having multiple spills (using multiple threads) over having just one 
 * spill. Following are the parameters that can be specified
 * <li>File size per map.
 * <li>Number of spills per map. 
 * <li>Number of maps per host.
 * <p>
 * Sort is used for benchmarking the performance. 
 */

public class ThreadedMapBenchmark extends Configured implements Tool {

  private static final Logger LOG =
      LoggerFactory.getLogger(ThreadedMapBenchmark.class);
  private static Path BASE_DIR =
    new Path(System.getProperty("test.build.data", 
                                File.separator + "benchmarks" + File.separator 
                                + "ThreadedMapBenchmark"));
  private static Path INPUT_DIR = new Path(BASE_DIR, "input");
  private static Path OUTPUT_DIR = new Path(BASE_DIR, "output");
  private static final float FACTOR = 2.3f; // mapreduce.task.io.sort.mb set to 
                                            // (FACTOR * data_size) should 
                                            // result in only 1 spill

  enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
  
  /**
   * Generates random input data of given size with keys and values of given 
   * sizes. By default it generates 128mb input data with 10 byte keys and 10 
   * byte values.
   */
  public static class Map extends MapReduceBase
  implements Mapper<WritableComparable, Writable,
                    BytesWritable, BytesWritable> {
  
  private long numBytesToWrite;
  private int minKeySize;
  private int keySizeRange;
  private int minValueSize;
  private int valueSizeRange;
  private Random random = new Random();
  private BytesWritable randomKey = new BytesWritable();
  private BytesWritable randomValue = new BytesWritable();
  
  private void randomizeBytes(byte[] data, int offset, int length) {
    for(int i = offset + length - 1; i >= offset; --i) {
      data[i] = (byte) random.nextInt(256);
    }
  }
  
  public void map(WritableComparable key, 
                  Writable value,
                  OutputCollector<BytesWritable, BytesWritable> output, 
                  Reporter reporter) throws IOException {
    int itemCount = 0;
    while (numBytesToWrite > 0) {
      int keyLength = minKeySize 
                      + (keySizeRange != 0 
                         ? random.nextInt(keySizeRange) 
                         : 0);
      randomKey.setSize(keyLength);
      randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
      int valueLength = minValueSize 
                        + (valueSizeRange != 0 
                           ? random.nextInt(valueSizeRange) 
                           : 0);
      randomValue.setSize(valueLength);
      randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
      output.collect(randomKey, randomValue);
      numBytesToWrite -= keyLength + valueLength;
      reporter.incrCounter(Counters.BYTES_WRITTEN, 1);
      reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
      if (++itemCount % 200 == 0) {
        reporter.setStatus("wrote record " + itemCount + ". " 
                           + numBytesToWrite + " bytes left.");
      }
    }
    reporter.setStatus("done with " + itemCount + " records.");
  }
  
  @Override
  public void configure(JobConf job) {
    numBytesToWrite = job.getLong("test.tmb.bytes_per_map",
                                  128 * 1024 * 1024);
    minKeySize = job.getInt("test.tmb.min_key", 10);
    keySizeRange = job.getInt("test.tmb.max_key", 10) - minKeySize;
    minValueSize = job.getInt("test.tmb.min_value", 10);
    valueSizeRange = job.getInt("test.tmb.max_value", 10) - minValueSize;
  }
}

  /**
   * Generate input data for the benchmark
   */
  public static void generateInputData(int dataSizePerMap, 
                                       int numSpillsPerMap, 
                                       int numMapsPerHost, 
                                       JobConf masterConf) 
  throws Exception { 
    JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
    job.setJobName("threaded-map-benchmark-random-writer");
    job.setJarByClass(ThreadedMapBenchmark.class);
    job.setInputFormat(UtilsForTests.RandomInputFormat.class);
    job.setOutputFormat(SequenceFileOutputFormat.class);
    
    job.setMapperClass(Map.class);
    job.setReducerClass(IdentityReducer.class);
    
    job.setOutputKeyClass(BytesWritable.class);
    job.setOutputValueClass(BytesWritable.class);
    
    JobClient client = new JobClient(job);
    ClusterStatus cluster = client.getClusterStatus();
    long totalDataSize = dataSizePerMap * numMapsPerHost 
                         * cluster.getTaskTrackers();
    job.set("test.tmb.bytes_per_map", 
            String.valueOf(dataSizePerMap * 1024 * 1024));
    job.setNumReduceTasks(0); // none reduce
    job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
    FileOutputFormat.setOutputPath(job, INPUT_DIR);
    
    FileSystem fs = FileSystem.get(job);
    fs.delete(BASE_DIR, true);
    
    LOG.info("Generating random input for the benchmark");
    LOG.info("Total data : " + totalDataSize + " mb");
    LOG.info("Data per map: " + dataSizePerMap + " mb");
    LOG.info("Number of spills : " + numSpillsPerMap);
    LOG.info("Number of maps per host : " + numMapsPerHost);
    LOG.info("Number of hosts : " + cluster.getTaskTrackers());
    
    JobClient.runJob(job); // generates the input for the benchmark
  }

  /**
   * This is the main routine for launching the benchmark. It generates random 
   * input data. The input is non-splittable. Sort is used for benchmarking. 
   * This benchmark reports the effect of having multiple sort and spill 
   * cycles over a single sort and spill. 
   * 
   * @throws IOException 
   */
  public int run (String[] args) throws Exception {
    LOG.info("Starting the benchmark for threaded spills");
    String version = "ThreadedMapBenchmark.0.0.1";
    System.out.println(version);
    
    String usage = 
      "Usage: threadedmapbenchmark " +
      "[-dataSizePerMap <data size (in mb) per map, default is 128 mb>] " + 
      "[-numSpillsPerMap <number of spills per map, default is 2>] " +
      "[-numMapsPerHost <number of maps per host, default is 1>]";
    
    int dataSizePerMap = 128; // in mb
    int numSpillsPerMap = 2;
    int numMapsPerHost = 1;
    JobConf masterConf = new JobConf(getConf());
    
    for (int i = 0; i < args.length; i++) { // parse command line
      if (args[i].equals("-dataSizePerMap")) {
        dataSizePerMap = Integer.parseInt(args[++i]);
      } else if (args[i].equals("-numSpillsPerMap")) {
        numSpillsPerMap = Integer.parseInt(args[++i]);
      } else if (args[i].equals("-numMapsPerHost")) {
        numMapsPerHost = Integer.parseInt(args[++i]);
      } else {
        System.err.println(usage);
        System.exit(-1);
      }
    }
    
    if (dataSizePerMap <  1 ||  // verify arguments
        numSpillsPerMap < 1 ||
        numMapsPerHost < 1)
      {
        System.err.println(usage);
        System.exit(-1);
      }
    
    FileSystem fs = null;
    try {
      // using random-writer to generate the input data
      generateInputData(dataSizePerMap, numSpillsPerMap, numMapsPerHost, 
                        masterConf);
      
      // configure job for sorting
      JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
      job.setJobName("threaded-map-benchmark-unspilled");
      job.setJarByClass(ThreadedMapBenchmark.class);

      job.setInputFormat(NonSplitableSequenceFileInputFormat.class);
      job.setOutputFormat(SequenceFileOutputFormat.class);
      
      job.setOutputKeyClass(BytesWritable.class);
      job.setOutputValueClass(BytesWritable.class);
      
      job.setMapperClass(IdentityMapper.class);        
      job.setReducerClass(IdentityReducer.class);
      
      FileInputFormat.addInputPath(job, INPUT_DIR);
      FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
      
      JobClient client = new JobClient(job);
      ClusterStatus cluster = client.getClusterStatus();
      job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
      job.setNumReduceTasks(1);
      
      // set mapreduce.task.io.sort.mb to avoid spill
      int ioSortMb = (int)Math.ceil(FACTOR * dataSizePerMap);
      job.set(JobContext.IO_SORT_MB, String.valueOf(ioSortMb));
      fs = FileSystem.get(job);
      
      LOG.info("Running sort with 1 spill per map");
      long startTime = System.currentTimeMillis();
      JobClient.runJob(job);
      long endTime = System.currentTimeMillis();
      
      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
               + " millisec");
      fs.delete(OUTPUT_DIR, true);
      
      // set mapreduce.task.io.sort.mb to have multiple spills
      JobConf spilledJob = new JobConf(job, ThreadedMapBenchmark.class);
      ioSortMb = (int)Math.ceil(FACTOR 
                                * Math.ceil((double)dataSizePerMap 
                                            / numSpillsPerMap));
      spilledJob.set(JobContext.IO_SORT_MB, String.valueOf(ioSortMb));
      spilledJob.setJobName("threaded-map-benchmark-spilled");
      spilledJob.setJarByClass(ThreadedMapBenchmark.class);
      
      LOG.info("Running sort with " + numSpillsPerMap + " spills per map");
      startTime = System.currentTimeMillis();
      JobClient.runJob(spilledJob);
      endTime = System.currentTimeMillis();
      
      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
               + " millisec");
    } finally {
      if (fs != null) {
        fs.delete(BASE_DIR, true);
      }
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new ThreadedMapBenchmark(), args);
    System.exit(res);
  }
}