TestReduceTask.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.Progressable;
import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
 * This test exercises the ValueIterator.
 */
public class TestReduceTask {

  static class NullProgress implements Progressable {
    public void progress() { }
  }

  private static class Pair {
    String key;
    String value;
    Pair(String k, String v) {
      key = k;
      value = v;
    }
  }
  private static Pair[][] testCases =
    new Pair[][]{
      new Pair[]{
                 new Pair("k1", "v1"),
                 new Pair("k2", "v2"),
                 new Pair("k3", "v3"),
                 new Pair("k3", "v4"),
                 new Pair("k4", "v5"),
                 new Pair("k5", "v6"),
      },
      new Pair[]{
                 new Pair("", "v1"),
                 new Pair("k1", "v2"),
                 new Pair("k2", "v3"),
                 new Pair("k2", "v4"),
      },
      new Pair[] {},
      new Pair[]{
                 new Pair("k1", "v1"),
                 new Pair("k1", "v2"),
                 new Pair("k1", "v3"),
                 new Pair("k1", "v4"),
      }
    };
  
  public void runValueIterator(Path tmpDir, Pair[] vals, 
                               Configuration conf, 
                               CompressionCodec codec) throws IOException {
    FileSystem localFs = FileSystem.getLocal(conf);
    FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
    Path path = new Path(tmpDir, "data.in");
    IFile.Writer<Text, Text> writer = 
      new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
                                   codec, null);
    for(Pair p: vals) {
      writer.append(new Text(p.key), new Text(p.value));
    }
    writer.close();
    
    @SuppressWarnings("unchecked")
    RawKeyValueIterator rawItr = 
      Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
                   false, conf.getInt(JobContext.IO_SORT_FACTOR, 100), tmpDir, 
                   new Text.Comparator(), new NullProgress(), null, null, null);
    @SuppressWarnings("unchecked") // WritableComparators are not generic
    ReduceTask.ValuesIterator valItr = 
      new ReduceTask.ValuesIterator<Text,Text>(rawItr,
          WritableComparator.get(Text.class), Text.class, Text.class,
          conf, new NullProgress());
    int i = 0;
    while (valItr.more()) {
      Object key = valItr.getKey();
      String keyString = key.toString();
      // make sure it matches!
      assertEquals(vals[i].key, keyString);
      // must have at least 1 value!
      assertTrue(valItr.hasNext());
      while (valItr.hasNext()) {
        String valueString = valItr.next().toString();
        // make sure the values match
        assertEquals(vals[i].value, valueString);
        // make sure the keys match
        assertEquals(vals[i].key, valItr.getKey().toString());
        i += 1;
      }
      // make sure the key hasn't changed under the hood
      assertEquals(keyString, valItr.getKey().toString());
      valItr.nextKey();
    }
    assertEquals(vals.length, i);
    // make sure we have progress equal to 1.0
    assertEquals(1.0f, rawItr.getProgress().get(),0.0000);
  }

  @Test
  public void testValueIterator() throws Exception {
    Path tmpDir = new Path("build/test/test.reduce.task");
    Configuration conf = new Configuration();
    for (Pair[] testCase: testCases) {
      runValueIterator(tmpDir, testCase, conf, null);
    }
  }

  @Test
  public void testValueIteratorWithCompression() throws Exception {
    Path tmpDir = new Path("build/test/test.reduce.task.compression");
    Configuration conf = new Configuration();
    DefaultCodec codec = new DefaultCodec();
    codec.setConf(conf);
    for (Pair[] testCase: testCases) {
      runValueIterator(tmpDir, testCase, conf, codec);
    }
  }
}