TestMROutputFormat.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

public class TestMROutputFormat {

  @Test
  public void testJobSubmission() throws Exception {
    JobConf conf = new JobConf();
    Job job = new Job(conf);
    job.setInputFormatClass(TestInputFormat.class);
    job.setMapperClass(TestMapper.class);
    job.setOutputFormatClass(TestOutputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);
    job.waitForCompletion(true);
    assertTrue(job.isSuccessful());
  }
  
  public static class TestMapper
  extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
    public void map(IntWritable key, IntWritable value, Context context) 
    throws IOException, InterruptedException {
      context.write(key, value);
    }
  }
}

class TestInputFormat extends InputFormat<IntWritable, IntWritable> {

  @Override
  public RecordReader<IntWritable, IntWritable> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException,
      InterruptedException {
    return new RecordReader<IntWritable, IntWritable>() {

      private boolean done = false;
      
      @Override
      public void close() throws IOException {
      }

      @Override
      public IntWritable getCurrentKey() throws IOException,
          InterruptedException {
	return new IntWritable(0);
      }

      @Override
      public IntWritable getCurrentValue() throws IOException,
          InterruptedException {
	return new IntWritable(0);
      }

      @Override
      public float getProgress() throws IOException, InterruptedException {
	return done ? 0 : 1;
      }

      @Override
      public void initialize(InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException {
      }

      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
	if (!done) {
	  done = true;
	  return true;
	}
	return false;
      }
    };
  }

  @Override
  public List<InputSplit> getSplits(JobContext context) throws IOException,
      InterruptedException {    
    List<InputSplit> list = new ArrayList<InputSplit>();
    list.add(new TestInputSplit());
    return list;
  }
}

class TestInputSplit extends InputSplit implements Writable {
  
  @Override
  public long getLength() throws IOException, InterruptedException {
	return 1;
  }

  @Override
  public String[] getLocations() throws IOException, InterruptedException {
	String[] hosts = {"localhost"};
	return hosts;
  }

  @Override
  public void readFields(DataInput in) throws IOException {
  }

  @Override
  public void write(DataOutput out) throws IOException {
  }	
}

class TestOutputFormat extends OutputFormat<IntWritable, IntWritable> 
implements Configurable {

  public static final String TEST_CONFIG_NAME = "mapred.test.jobsubmission";
  private Configuration conf;
  
  @Override
  public void checkOutputSpecs(JobContext context) throws IOException,
      InterruptedException {
    conf.setBoolean(TEST_CONFIG_NAME, true);
  }

  @Override
  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
      throws IOException, InterruptedException {
    return new OutputCommitter() {

      @Override
      public void abortTask(TaskAttemptContext taskContext) throws IOException {
      }

      @Override
      public void commitTask(TaskAttemptContext taskContext) throws IOException {
      }

      @Override
      public boolean needsTaskCommit(TaskAttemptContext taskContext)
          throws IOException {
	return false;
      }

      @Override
      public void setupJob(JobContext jobContext) throws IOException {
      }

      @Override
      public void setupTask(TaskAttemptContext taskContext) throws IOException {
      }
    };
  }

  @Override
  public RecordWriter<IntWritable, IntWritable> getRecordWriter(
      TaskAttemptContext context) throws IOException, InterruptedException {
    assertTrue(context.getConfiguration().getBoolean(TEST_CONFIG_NAME, false));
    return new RecordWriter<IntWritable, IntWritable>() {

      @Override
      public void close(TaskAttemptContext context) throws IOException,
          InterruptedException {	
      }

      @Override
      public void write(IntWritable key, IntWritable value) throws IOException,
          InterruptedException {	
      }
    }; 
  }
  
  @Override
  public Configuration getConf() {
      return conf;
  }

  @Override
  public void setConf(Configuration conf) {
      this.conf = conf;        
  }
}