LargeSorter.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A sample MR job that helps with testing large sorts in the MapReduce
 * framework. Mapper generates the specified number of bytes and pipes them
 * to the reducers.
 *
 * <code>mapreduce.large-sorter.mbs-per-map</code> specifies the amount
 * of data (in MBs) to generate per map. By default, this is twice the value
 * of <code>mapreduce.task.io.sort.mb</code> or 1 GB if that is not specified
 * either.
 * <code>mapreduce.large-sorter.map-tasks</code> specifies the number of map
 * tasks to run.
 * <code>mapreduce.large-sorter.reduce-tasks</code> specifies the number of
 * reduce tasks to run.
 */
public class LargeSorter extends Configured implements Tool {
  private static final String LS_PREFIX = "mapreduce.large-sorter.";

  public static final String MBS_PER_MAP = LS_PREFIX + "mbs-per-map";
  public static final String NUM_MAP_TASKS = LS_PREFIX + "map-tasks";
  public static final String NUM_REDUCE_TASKS = LS_PREFIX + "reduce-tasks";

  private static final String MAX_VALUE = LS_PREFIX + "max-value";
  private static final String MIN_VALUE = LS_PREFIX + "min-value";
  private static final String MIN_KEY = LS_PREFIX + "min-key";
  private static final String MAX_KEY = LS_PREFIX + "max-key";
  
  /**
   * User counters
   */
  enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
  
  /**
   * A custom input format that creates virtual inputs of a single string
   * for each map.
   */
  static class RandomInputFormat extends InputFormat<Text, Text> {

    /** 
     * Generate the requested number of file splits, with the filename
     * set to the filename of the output file.
     */
    public List<InputSplit> getSplits(JobContext job) throws IOException {
      List<InputSplit> result = new ArrayList<InputSplit>();
      Path outDir = FileOutputFormat.getOutputPath(job);
      int numSplits = 
            job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
      for(int i=0; i < numSplits; ++i) {
        result.add(new FileSplit(
            new Path(outDir, "dummy-split-" + i), 0, 1, null));
      }
      return result;
    }

    /**
     * Return a single record (filename, "") where the filename is taken from
     * the file split.
     */
    static class RandomRecordReader extends RecordReader<Text, Text> {
      Path name;
      Text key = null;
      Text value = new Text();
      public RandomRecordReader(Path p) {
        name = p;
      }
      
      public void initialize(InputSplit split,
                             TaskAttemptContext context)
      throws IOException, InterruptedException {
    	  
      }
      
      public boolean nextKeyValue() {
        if (name != null) {
          key = new Text();
          key.set(name.getName());
          name = null;
          return true;
        }
        return false;
      }
      
      public Text getCurrentKey() {
        return key;
      }
      
      public Text getCurrentValue() {
        return value;
      }
      
      public void close() {}

      public float getProgress() {
        return 0.0f;
      }
    }

    public RecordReader<Text, Text> createRecordReader(InputSplit split,
        TaskAttemptContext context) throws IOException, InterruptedException {
      return new RandomRecordReader(((FileSplit) split).getPath());
    }
  }

  static class RandomMapper extends Mapper<WritableComparable, Writable,
                      BytesWritable, BytesWritable> {
    
    private long numBytesToWrite;
    private int minKeySize;
    private int keySizeRange;
    private int minValueSize;
    private int valueSizeRange;
    private Random random = new Random();
    private BytesWritable randomKey = new BytesWritable();
    private BytesWritable randomValue = new BytesWritable();
    
    private void randomizeBytes(byte[] data, int offset, int length) {
      for(int i=offset + length - 1; i >= offset; --i) {
        data[i] = (byte) random.nextInt(256);
      }
    }

    @Override
    public void setup(Context context) {
      Configuration conf = context.getConfiguration();
      numBytesToWrite = 1024 * 1024 * conf.getLong(MBS_PER_MAP,
          2 * conf.getInt(MRJobConfig.IO_SORT_MB, 512));
      minKeySize = conf.getInt(MIN_KEY, 10);
      keySizeRange =
          conf.getInt(MAX_KEY, 1000) - minKeySize;
      minValueSize = conf.getInt(MIN_VALUE, 0);
      valueSizeRange =
          conf.getInt(MAX_VALUE, 20000) - minValueSize;
    }

    /**
     * Given an output filename, write a bunch of random records to it.
     */
    public void map(WritableComparable key, 
                    Writable value,
                    Context context) throws IOException,InterruptedException {
      int itemCount = 0;
      while (numBytesToWrite > 0) {
        int keyLength = minKeySize + 
          (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
        randomKey.setSize(keyLength);
        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
        int valueLength = minValueSize +
          (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
        randomValue.setSize(valueLength);
        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
        context.write(randomKey, randomValue);
        numBytesToWrite -= keyLength + valueLength;
        context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
        if (++itemCount % 200 == 0) {
          context.setStatus("wrote record " + itemCount + ". " + 
                             numBytesToWrite + " bytes left.");
        }
      }
      context.setStatus("done with " + itemCount + " records.");
    }
  }

  static class Discarder extends Reducer<BytesWritable, BytesWritable,
      WritableComparable, Writable> {
    @Override
    public void reduce(BytesWritable key, Iterable<BytesWritable> values,
        Context context) throws IOException, InterruptedException {
      // Do nothing
    }
  }

  private void verifyNotZero(Configuration conf, String config) {
    if (conf.getInt(config, 1) <= 0) {
      throw new IllegalArgumentException(config + "should be > 0");
    }
  }

  public int run(String[] args) throws Exception {
    Path outDir = new Path(
        LargeSorter.class.getName() + System.currentTimeMillis());

    Configuration conf = getConf();
    verifyNotZero(conf, MBS_PER_MAP);
    verifyNotZero(conf, NUM_MAP_TASKS);

    conf.setInt(MRJobConfig.NUM_MAPS, conf.getInt(NUM_MAP_TASKS, 2));

    int ioSortMb = conf.getInt(MRJobConfig.IO_SORT_MB, 512);
    int mapMb = Math.max(2 * ioSortMb, conf.getInt(MRJobConfig.MAP_MEMORY_MB,
        MRJobConfig.DEFAULT_MAP_MEMORY_MB));
    conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb);
    conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");

    Job job = Job.getInstance(conf);
    job.setJarByClass(LargeSorter.class);
    job.setJobName("large-sorter");
    FileOutputFormat.setOutputPath(job, outDir);
    job.setOutputKeyClass(BytesWritable.class);
    job.setOutputValueClass(BytesWritable.class);
    job.setInputFormatClass(RandomInputFormat.class);
    job.setMapperClass(RandomMapper.class);        
    job.setReducerClass(Discarder.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS, 1));

    Date startTime = new Date();
    System.out.println("Job started: " + startTime);
    int ret = 1;
    try {
      ret = job.waitForCompletion(true) ? 0 : 1;
    } finally {
      FileSystem.get(conf).delete(outDir, true);
    }
    Date endTime = new Date();
    System.out.println("Job ended: " + endTime);
    System.out.println("The job took " + 
                       (endTime.getTime() - startTime.getTime()) /1000 + 
                       " seconds.");
    
    return ret;
  }
  
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new LargeSorter(), args);
    System.exit(res);
  }

}