MultipleOutputs.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.lib.output;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The MultipleOutputs class simplifies writing output data 
 * to multiple outputs
 * 
 * <p> 
 * Case one: writing to additional outputs other than the job default output.
 *
 * Each additional output, or named output, may be configured with its own
 * <code>OutputFormat</code>, with its own key class and with its own value
 * class.
 * </p>
 * 
 * <p>
 * Case two: to write data to different files provided by user
 * </p>
 * 
 * <p>
 * MultipleOutputs supports counters, by default they are disabled. The 
 * counters group is the {@link MultipleOutputs} class name. The names of the 
 * counters are the same as the output name. These count the number records 
 * written to each output name.
 * </p>
 * 
 * Usage pattern for job submission:
 * <pre>
 *
 * Job job = new Job();
 *
 * FileInputFormat.setInputPath(job, inDir);
 * FileOutputFormat.setOutputPath(job, outDir);
 *
 * job.setMapperClass(MOMap.class);
 * job.setReducerClass(MOReduce.class);
 * ...
 *
 * // Defines additional single text based output 'text' for the job
 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
 * LongWritable.class, Text.class);
 *
 * // Defines additional sequence-file based output 'sequence' for the job
 * MultipleOutputs.addNamedOutput(job, "seq",
 *   SequenceFileOutputFormat.class,
 *   LongWritable.class, Text.class);
 * ...
 *
 * job.waitForCompletion(true);
 * ...
 * </pre>
 * <p>
 * Usage in Reducer:
 * <pre>
 * &lt;K, V&gt; String generateFileName(K k, V v) {
 *   return k.toString() + "_" + v.toString();
 * }
 * 
 * public class MOReduce extends
 *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
 * private MultipleOutputs mos;
 * public void setup(Context context) {
 * ...
 * mos = new MultipleOutputs(context);
 * }
 *
 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
 * Context context)
 * throws IOException {
 * ...
 * mos.write("text", , key, new Text("Hello"));
 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
 * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
 * ...
 * }
 *
 * public void cleanup(Context) throws IOException {
 * mos.close();
 * ...
 * }
 *
 * }
 * </pre>
 * 
 * <p>
 * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat,
 * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat
 * from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
 * </p>
 * 
 * <p>
 * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and 
 * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output.
 * <b>Warning</b>: when the baseOutputPath passed to MultipleOutputs.write
 * is a path that resolves outside of the final job output directory, the
 * directory is created immediately and then persists through subsequent
 * task retries, breaking the concept of output committing:
 * </p>
 * 
 * <pre>
 * private MultipleOutputs&lt;Text, Text&gt; out;
 * 
 * public void setup(Context context) {
 *   out = new MultipleOutputs&lt;Text, Text&gt;(context);
 *   ...
 * }
 * 
 * public void reduce(Text key, Iterable&lt;Text&gt; values, Context context) throws IOException, InterruptedException {
 * for (Text t : values) {
 *   out.write(key, t, generateFileName(&lt;<i>parameter list...</i>&gt;));
 *   }
 * }
 * 
 * protected void cleanup(Context context) throws IOException, InterruptedException {
 *   out.close();
 * }
 * </pre>
 * 
 * <p>
 * Use your own code in <code>generateFileName()</code> to create a custom path to your results. 
 * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system. 
 * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. 
 * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below. 
 * </p>
 * 
 * <pre>
 * private String generateFileName(Text k) {
 *   // expect Text k in format "Surname|Forename"
 *   String[] kStr = k.toString().split("\\|");
 *   
 *   String sName = kStr[0];
 *   String fName = kStr[1];
 *
 *   // example for k = Smith|John
 *   // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
 *   return sName + "/" + fName;
 * }
 * </pre>
 * 
 * <p>
 * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
 * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code>
 * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration.
 * </p> 
 * 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultipleOutputs<KEYOUT, VALUEOUT> {

  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";

  private static final String MO_PREFIX = 
    "mapreduce.multipleoutputs.namedOutput.";

  private static final String FORMAT = ".format";
  private static final String KEY = ".key";
  private static final String VALUE = ".value";
  private static final String COUNTERS_ENABLED = 
    "mapreduce.multipleoutputs.counters";

  /**
   * Counters group used by the counters of MultipleOutputs.
   */
  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
  private static final Logger LOG =
      LoggerFactory.getLogger(org.apache.hadoop.mapred.lib.MultipleOutputs.class);

  /**
   * Cache for the taskContexts
   */
  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
  /**
   * Cached TaskAttemptContext which uses the job's configured settings
   */
  private TaskAttemptContext jobOutputFormatContext;

  /**
   * Checks if a named output name is valid token.
   *
   * @param namedOutput named output Name
   * @throws IllegalArgumentException if the output name is not valid.
   */
  private static void checkTokenName(String namedOutput) {
    if (namedOutput == null || namedOutput.length() == 0) {
      throw new IllegalArgumentException(
        "Name cannot be NULL or emtpy");
    }
    for (char ch : namedOutput.toCharArray()) {
      if ((ch >= 'A') && (ch <= 'Z')) {
        continue;
      }
      if ((ch >= 'a') && (ch <= 'z')) {
        continue;
      }
      if ((ch >= '0') && (ch <= '9')) {
        continue;
      }
      throw new IllegalArgumentException(
        "Name cannot be have a '" + ch + "' char");
    }
  }

  /**
   * Checks if output name is valid.
   *
   * name cannot be the name used for the default output
   * @param outputPath base output Name
   * @throws IllegalArgumentException if the output name is not valid.
   */
  private static void checkBaseOutputPath(String outputPath) {
    if (outputPath.equals(FileOutputFormat.PART)) {
      throw new IllegalArgumentException("output name cannot be 'part'");
    }
  }
  
  /**
   * Checks if a named output name is valid.
   *
   * @param namedOutput named output Name
   * @throws IllegalArgumentException if the output name is not valid.
   */
  private static void checkNamedOutputName(JobContext job,
      String namedOutput, boolean alreadyDefined) {
    checkTokenName(namedOutput);
    checkBaseOutputPath(namedOutput);
    List<String> definedChannels = getNamedOutputsList(job);
    if (alreadyDefined && definedChannels.contains(namedOutput)) {
      throw new IllegalArgumentException("Named output '" + namedOutput +
        "' already alreadyDefined");
    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
      throw new IllegalArgumentException("Named output '" + namedOutput +
        "' not defined");
    }
  }

  // Returns list of channel names.
  private static List<String> getNamedOutputsList(JobContext job) {
    List<String> names = new ArrayList<String>();
    StringTokenizer st = new StringTokenizer(
      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
    while (st.hasMoreTokens()) {
      names.add(st.nextToken());
    }
    return names;
  }

  // Returns the named output OutputFormat.
  @SuppressWarnings("unchecked")
  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
    JobContext job, String namedOutput) {
    return (Class<? extends OutputFormat<?, ?>>)
      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
      OutputFormat.class);
  }

  // Returns the key class for a named output.
  private static Class<?> getNamedOutputKeyClass(JobContext job,
                                                String namedOutput) {
    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
      Object.class);
  }

  // Returns the value class for a named output.
  private static Class<?> getNamedOutputValueClass(
      JobContext job, String namedOutput) {
    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
      null, Object.class);
  }

  /**
   * Adds a named output for the job.
   *
   * @param job               job to add the named output
   * @param namedOutput       named output name, it has to be a word, letters
   *                          and numbers only, cannot be the word 'part' as
   *                          that is reserved for the default output.
   * @param outputFormatClass OutputFormat class.
   * @param keyClass          key class
   * @param valueClass        value class
   */
  @SuppressWarnings("unchecked")
  public static void addNamedOutput(Job job, String namedOutput,
      Class<? extends OutputFormat> outputFormatClass,
      Class<?> keyClass, Class<?> valueClass) {
    checkNamedOutputName(job, namedOutput, true);
    Configuration conf = job.getConfiguration();
    conf.set(MULTIPLE_OUTPUTS,
      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
      OutputFormat.class);
    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
  }

  /**
   * Enables or disables counters for the named outputs.
   * 
   * The counters group is the {@link MultipleOutputs} class name.
   * The names of the counters are the same as the named outputs. These
   * counters count the number records written to each output name.
   * By default these counters are disabled.
   *
   * @param job    job  to enable counters
   * @param enabled indicates if the counters will be enabled or not.
   */
  public static void setCountersEnabled(Job job, boolean enabled) {
    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
  }

  /**
   * Returns if the counters for the named outputs are enabled or not.
   * By default these counters are disabled.
   *
   * @param job    the job 
   * @return TRUE if the counters are enabled, FALSE if they are disabled.
   */
  public static boolean getCountersEnabled(JobContext job) {
    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
  }

  @VisibleForTesting
  synchronized void setRecordWriters(Map<String, RecordWriter<?, ?>> recordWriters) {
    this.recordWriters = recordWriters;
  }

  /**
   * Wraps RecordWriter to increment counters. 
   */
  @SuppressWarnings("unchecked")
  private static class RecordWriterWithCounter extends RecordWriter {
    private RecordWriter writer;
    private String counterName;
    private TaskInputOutputContext context;

    public RecordWriterWithCounter(RecordWriter writer, String counterName,
                                   TaskInputOutputContext context) {
      this.writer = writer;
      this.counterName = counterName;
      this.context = context;
    }

    @SuppressWarnings({"unchecked"})
    public void write(Object key, Object value) 
        throws IOException, InterruptedException {
      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
      writer.write(key, value);
    }

    public void close(TaskAttemptContext context) 
        throws IOException, InterruptedException {
      writer.close(context);
    }
  }

  // instance code, to be used from Mapper/Reducer code

  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
  private Set<String> namedOutputs;
  private Map<String, RecordWriter<?, ?>> recordWriters;
  private boolean countersEnabled;
  
  /**
   * Creates and initializes multiple outputs support,
   * it should be instantiated in the Mapper/Reducer setup method.
   *
   * @param context the TaskInputOutputContext object
   */
  public MultipleOutputs(
      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
    this.context = context;
    namedOutputs = Collections.unmodifiableSet(
      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
    countersEnabled = getCountersEnabled(context);
  }

  /**
   * Write key and value to the namedOutput.
   *
   * Output path is a unique file generated for the namedOutput.
   * For example, {namedOutput}-(m|r)-{part-number}
   * 
   * @param namedOutput the named output name
   * @param key         the key
   * @param value       the value
   */
  @SuppressWarnings("unchecked")
  public <K, V> void write(String namedOutput, K key, V value)
      throws IOException, InterruptedException {
    write(namedOutput, key, value, namedOutput);
  }

  /**
   * Write key and value to baseOutputPath using the namedOutput.
   * 
   * @param namedOutput    the named output name
   * @param key            the key
   * @param value          the value
   * @param baseOutputPath base-output path to write the record to.
   * Note: Framework will generate unique filename for the baseOutputPath
   * <b>Warning</b>: when the baseOutputPath is a path that resolves
   * outside of the final job output directory, the directory is created
   * immediately and then persists through subsequent task retries, breaking
   * the concept of output committing.
   */
  @SuppressWarnings("unchecked")
  public <K, V> void write(String namedOutput, K key, V value,
      String baseOutputPath) throws IOException, InterruptedException {
    checkNamedOutputName(context, namedOutput, false);
    checkBaseOutputPath(baseOutputPath);
    if (!namedOutputs.contains(namedOutput)) {
      throw new IllegalArgumentException("Undefined named output '" +
        namedOutput + "'");
    }
    TaskAttemptContext taskContext = getContext(namedOutput);
    getRecordWriter(taskContext, baseOutputPath).write(key, value);
  }

  /**
   * Write key value to an output file name.
   * 
   * Gets the record writer from job's output format.  
   * Job's output format should be a FileOutputFormat.
   * 
   * @param key       the key
   * @param value     the value
   * @param baseOutputPath base-output path to write the record to.
   * Note: Framework will generate unique filename for the baseOutputPath
   * <b>Warning</b>: when the baseOutputPath is a path that resolves
   * outside of the final job output directory, the directory is created
   * immediately and then persists through subsequent task retries, breaking
   * the concept of output committing.
   */
  @SuppressWarnings("unchecked")
  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
      throws IOException, InterruptedException {
    checkBaseOutputPath(baseOutputPath);
    if (jobOutputFormatContext == null) {
      jobOutputFormatContext = 
        new TaskAttemptContextImpl(context.getConfiguration(), 
                                   context.getTaskAttemptID(),
                                   new WrappedStatusReporter(context));
    }
    getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
  }

  // by being synchronized MultipleOutputTask can be use with a
  // MultithreadedMapper.
  @SuppressWarnings("unchecked")
  private synchronized RecordWriter getRecordWriter(
      TaskAttemptContext taskContext, String baseFileName) 
      throws IOException, InterruptedException {
    
    // look for record-writer in the cache
    RecordWriter writer = recordWriters.get(baseFileName);
    
    // If not in cache, create a new one
    if (writer == null) {
      // get the record writer from context output format
      FileOutputFormat.setOutputName(taskContext, baseFileName);
      try {
        writer = ((OutputFormat) ReflectionUtils.newInstance(
          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
          .getRecordWriter(taskContext);
      } catch (ClassNotFoundException e) {
        throw new IOException(e);
      }
 
      // if counters are enabled, wrap the writer with context 
      // to increment counters 
      if (countersEnabled) {
        writer = new RecordWriterWithCounter(writer, baseFileName, context);
      }
      
      // add the record-writer to the cache
      recordWriters.put(baseFileName, writer);
    }
    return writer;
  }

   // Create a taskAttemptContext for the named output with 
   // output format and output key/value types put in the context
  private TaskAttemptContext getContext(String nameOutput) throws IOException {
      
    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
    
    if (taskContext != null) {
        return taskContext;
    }
    
    // The following trick leverages the instantiation of a record writer via
    // the job thus supporting arbitrary output formats.
    Job job = Job.getInstance(context.getConfiguration());
    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
    taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
        .getTaskAttemptID(), new WrappedStatusReporter(context));

    taskContexts.put(nameOutput, taskContext);

    return taskContext;
  }

  private static class WrappedStatusReporter extends StatusReporter {

    TaskAttemptContext context;

    public WrappedStatusReporter(TaskAttemptContext context) {
      this.context = context;
    }

    @Override
    public Counter getCounter(Enum<?> name) {
      return context.getCounter(name);
    }

    @Override
    public Counter getCounter(String group, String name) {
      return context.getCounter(group, name);
    }

    @Override
    public void progress() {
      context.progress();
    }

    @Override
    public float getProgress() {
      return context.getProgress();
    }
    
    @Override
    public void setStatus(String status) {
      context.setStatus(status);
    }
  }

  /**
   * Closes all the opened outputs.
   * 
   * This should be called from cleanup method of map/reduce task.
   * If overridden subclasses must invoke <code>super.close()</code> at the
   * end of their <code>close()</code>
   * 
   */
  @SuppressWarnings("unchecked")
  public void close() throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
        MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
    AtomicBoolean encounteredException = new AtomicBoolean(false);
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
        .setUncaughtExceptionHandler(((t, e) -> {
          LOG.error("Thread " + t + " failed unexpectedly", e);
          encounteredException.set(true);
        })).build();
    ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);

    List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());

    for (RecordWriter writer : recordWriters.values()) {
      callableList.add(() -> {
        try {
          writer.close(context);
        } catch (IOException e) {
          LOG.error("Error while closing MultipleOutput file", e);
          encounteredException.set(true);
        }
        return null;
      });
    }
    try {
      executorService.invokeAll(callableList);
    } catch (InterruptedException e) {
      LOG.warn("Closing is Interrupted");
      Thread.currentThread().interrupt();
    } finally {
      executorService.shutdown();
    }

    if (encounteredException.get()) {
      throw new IOException(
          "One or more threads encountered exception during close. See prior errors.");
    }
  }
}