SortedRanges.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Keeps the Ranges sorted by startIndex.
 * The added ranges are always ensured to be non-overlapping.
 * Provides the SkipRangeIterator, which skips the Ranges 
 * stored in this object.
 */
class SortedRanges implements Writable{
  
  private static final Logger LOG =
      LoggerFactory.getLogger(SortedRanges.class);
  
  private TreeSet<Range> ranges = new TreeSet<Range>();
  private long indicesCount;
  
  /**
   * Get Iterator which skips the stored ranges.
   * The Iterator.next() call return the index starting from 0.
   * @return SkipRangeIterator
   */
  synchronized SkipRangeIterator skipRangeIterator(){
    return new SkipRangeIterator(ranges.iterator());
  }
  
  /**
   * Get the no of indices stored in the ranges.
   * @return indices count
   */
  synchronized long getIndicesCount() {
    return indicesCount;
  }
  
  /**
   * Get the sorted set of ranges.
   * @return ranges
   */
  synchronized SortedSet<Range> getRanges() {
  	return ranges;
 	}
  
  /**
   * Add the range indices. It is ensured that the added range 
   * doesn't overlap the existing ranges. If it overlaps, the 
   * existing overlapping ranges are removed and a single range 
   * having the superset of all the removed ranges and this range 
   * is added. 
   * If the range is of 0 length, doesn't do anything.
   * @param range Range to be added.
   */
  synchronized void add(Range range){
    if(range.isEmpty()) {
      return;
    }
    
    long startIndex = range.getStartIndex();
    long endIndex = range.getEndIndex();
    //make sure that there are no overlapping ranges
    SortedSet<Range> headSet = ranges.headSet(range);
    if(headSet.size()>0) {
      Range previousRange = headSet.last();
      LOG.debug("previousRange "+previousRange);
      if(startIndex<previousRange.getEndIndex()) {
        //previousRange overlaps this range
        //remove the previousRange
        if(ranges.remove(previousRange)) {
          indicesCount-=previousRange.getLength();
        }
        //expand this range
        startIndex = previousRange.getStartIndex();
        endIndex = endIndex>=previousRange.getEndIndex() ?
                          endIndex : previousRange.getEndIndex();
      }
    }
    
    Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
    while(tailSetIt.hasNext()) {
      Range nextRange = tailSetIt.next();
      LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
          "  endIndex:"+endIndex);
      if(endIndex>=nextRange.getStartIndex()) {
        //nextRange overlaps this range
        //remove the nextRange
        tailSetIt.remove();
        indicesCount-=nextRange.getLength();
        if(endIndex<nextRange.getEndIndex()) {
          //expand this range
          endIndex = nextRange.getEndIndex();
          break;
        }
      } else {
        break;
      }
    }
    add(startIndex,endIndex);
  }
  
  /**
   * Remove the range indices. If this range is  
   * found in existing ranges, the existing ranges 
   * are shrunk.
   * If range is of 0 length, doesn't do anything.
   * @param range Range to be removed.
   */
  synchronized void remove(Range range) {
    if(range.isEmpty()) {
      return;
    }
    long startIndex = range.getStartIndex();
    long endIndex = range.getEndIndex();
    //make sure that there are no overlapping ranges
    SortedSet<Range> headSet = ranges.headSet(range);
    if(headSet.size()>0) {
      Range previousRange = headSet.last();
      LOG.debug("previousRange "+previousRange);
      if(startIndex<previousRange.getEndIndex()) {
        //previousRange overlaps this range
        //narrow down the previousRange
        if(ranges.remove(previousRange)) {
          indicesCount-=previousRange.getLength();
          LOG.debug("removed previousRange "+previousRange);
        }
        add(previousRange.getStartIndex(), startIndex);
        if(endIndex<=previousRange.getEndIndex()) {
          add(endIndex, previousRange.getEndIndex());
        }
      }
    }
    
    Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
    while(tailSetIt.hasNext()) {
      Range nextRange = tailSetIt.next();
      LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
          "  endIndex:"+endIndex);
      if(endIndex>nextRange.getStartIndex()) {
        //nextRange overlaps this range
        //narrow down the nextRange
        tailSetIt.remove();
        indicesCount-=nextRange.getLength();
        if(endIndex<nextRange.getEndIndex()) {
          add(endIndex, nextRange.getEndIndex());
          break;
        }
      } else {
        break;
      }
    }
  }
  
  private void add(long start, long end) {
    if(end>start) {
      Range recRange = new Range(start, end-start);
      ranges.add(recRange);
      indicesCount+=recRange.getLength();
      LOG.debug("added "+recRange);
    }
  }
  
  public synchronized void readFields(DataInput in) throws IOException {
    indicesCount = in.readLong();
    ranges = new TreeSet<Range>();
    int size = in.readInt();
    for(int i=0;i<size;i++) {
      Range range = new Range();
      range.readFields(in);
      ranges.add(range);
    }
  }

  public synchronized void write(DataOutput out) throws IOException {
    out.writeLong(indicesCount);
    out.writeInt(ranges.size());
    Iterator<Range> it = ranges.iterator();
    while(it.hasNext()) {
      Range range = it.next();
      range.write(out);
    }
  }
  
  public String toString() {
    StringBuilder sb = new StringBuilder();
    Iterator<Range> it = ranges.iterator();
    while(it.hasNext()) {
      Range range = it.next();
      sb.append(range.toString()+"\n");
    }
    return sb.toString();
  }
  
  /**
   * Index Range. Comprises of start index and length.
   * A Range can be of 0 length also. The Range stores indices 
   * of type long.
   */
  static class Range implements Comparable<Range>, Writable{
    private long startIndex;
    private long length;
        
    Range(long startIndex, long length) {
      if(length<0) {
        throw new RuntimeException("length can't be negative");
      }
      this.startIndex = startIndex;
      this.length = length;
    }
    
    Range() {
      this(0,0);
    }
    
    /**
     * Get the start index. Start index in inclusive.
     * @return startIndex. 
     */
    long getStartIndex() {
      return startIndex;
    }
    
    /**
     * Get the end index. End index is exclusive.
     * @return endIndex.
     */
    long getEndIndex() {
      return startIndex + length;
    }
    
   /**
    * Get Length.
    * @return length
    */
    long getLength() {
      return length;
    }
    
    /**
     * Range is empty if its length is zero.
     * @return <code>true</code> if empty
     *         <code>false</code> otherwise.
     */
    boolean isEmpty() {
      return length==0;
    }
    
    public boolean equals(Object o) {
      if (o instanceof Range) {
        Range range = (Range)o;
        return startIndex==range.startIndex &&
        length==range.length;
      }
      return false;
    }
    
    public int hashCode() {
      return Long.valueOf(startIndex).hashCode() +
          Long.valueOf(length).hashCode();
    }
    
    public int compareTo(Range o) {
      // Ensure sgn(x.compareTo(y) == -sgn(y.compareTo(x))
      return this.startIndex < o.startIndex ? -1 :
          (this.startIndex > o.startIndex ? 1 :
          (this.length < o.length ? -1 :
          (this.length > o.length ? 1 : 0)));
    }

    public void readFields(DataInput in) throws IOException {
      startIndex = in.readLong();
      length = in.readLong();
    }

    public void write(DataOutput out) throws IOException {
      out.writeLong(startIndex);
      out.writeLong(length);
    }
    
    public String toString() {
      return startIndex +":" + length;
    }    
  }
  
  /**
   * Index Iterator which skips the stored ranges.
   */
  static class SkipRangeIterator implements Iterator<Long> {
    Iterator<Range> rangeIterator;
    Range range = new Range();
    long next = -1;
    
    /**
     * Constructor
     * @param rangeIterator the iterator which gives the ranges.
     */
    SkipRangeIterator(Iterator<Range> rangeIterator) {
      this.rangeIterator = rangeIterator;
      doNext();
    }
    
    /**
     * Returns true till the index reaches Long.MAX_VALUE.
     * @return <code>true</code> next index exists.
     *         <code>false</code> otherwise.
     */
    public synchronized boolean hasNext() {
      return next<Long.MAX_VALUE;
    }
    
    /**
     * Get the next available index. The index starts from 0.
     * @return next index
     */
    public synchronized Long next() {
      long ci = next;
      doNext();
      return ci;
    }
    
    private void doNext() {
      next++;
      LOG.debug("currentIndex "+next +"   "+range);
      skipIfInRange();
      while(next>=range.getEndIndex() && rangeIterator.hasNext()) {
        range = rangeIterator.next();
        skipIfInRange();
      }
    }
    
    private void skipIfInRange() {
      if(next>=range.getStartIndex() && 
          next<range.getEndIndex()) {
        //need to skip the range
        LOG.warn("Skipping index " + next +"-" + range.getEndIndex());
        next = range.getEndIndex();
        
      }
    }
    
    /**
     * Get whether all the ranges have been skipped.
     * @return <code>true</code> if all ranges have been skipped.
     *         <code>false</code> otherwise.
     */
    synchronized boolean skippedAllRanges() {
      return !rangeIterator.hasNext() && next>range.getEndIndex();
    }
    
    /**
     * Remove is not supported. Doesn't apply.
     */
    public void remove() {
      throw new UnsupportedOperationException("remove not supported.");
    }
    
  }

}