ConcatenatedInputFilesDemuxer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.tools.rumen;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

public class ConcatenatedInputFilesDemuxer implements InputDemuxer {
  private String name;
  private DelimitedInputStream input;

  private String knownNextFileName = null;

  static private int MAXIMUM_HEADER_LINE_LENGTH = 500;

  @Override
  public void bindTo(Path path, Configuration conf) throws IOException {
    InputStream underlyingInput = null;

    if (name != null) { // re-binding before the previous one was consumed.
      close();
    }
    name = path.getName();

    underlyingInput = new PossiblyDecompressedInputStream(path, conf);

    input =
        new DelimitedInputStream(new BufferedInputStream(underlyingInput),
            "\f!!FILE=", "!!\n");

    knownNextFileName = input.nextFileName();

    if (knownNextFileName == null) {
      close();

      return;
    }

    /*
     * We handle files in specialized formats by trying their demuxers first,
     * not by failing here.
     */
    return;
  }

  @Override
  public Pair<String, InputStream> getNext() throws IOException {
    if (knownNextFileName != null) {
      Pair<String, InputStream> result =
          new Pair<String, InputStream>(knownNextFileName, input);

      knownNextFileName = null;

      return result;
    }

    String nextFileName = input.nextFileName();

    if (nextFileName == null) {
      return null;
    }

    return new Pair<String, InputStream>(nextFileName, input);
  }

  @Override
  public void close() throws IOException {
    if (input != null) {
      input.close();
    }
  }

  /**
   * A simple wrapper class to make any input stream delimited. It has an extra
   * method, getName.
   * 
   * The input stream should have lines that look like
   * <marker><filename><endmarker> . The text <marker> should not occur
   * elsewhere in the file. The text <endmarker> should not occur in a file
   * name.
   */
  static class DelimitedInputStream extends InputStream {
    private InputStream input;

    private boolean endSeen = false;

    private final String fileMarker;

    private final byte[] markerBytes;

    private final byte[] fileMarkerBuffer;

    private final String fileEndMarker;

    private final byte[] endMarkerBytes;

    private final byte[] fileEndMarkerBuffer;

    /**
     * Constructor.
     * 
     * @param input
     */
    public DelimitedInputStream(InputStream input, String fileMarker,
        String fileEndMarker) {
      this.input = new BufferedInputStream(input, 10000);
      this.input.mark(10000);
      this.fileMarker = fileMarker;
      this.markerBytes = this.fileMarker.getBytes();
      this.fileMarkerBuffer = new byte[this.markerBytes.length];
      this.fileEndMarker = fileEndMarker;
      this.endMarkerBytes = this.fileEndMarker.getBytes();
      this.fileEndMarkerBuffer = new byte[this.endMarkerBytes.length];
    }

    @Override
    public int read() throws IOException {
      if (endSeen) {
        return -1;
      }

      input.mark(10000);

      int result = input.read();

      if (result < 0) {
        endSeen = true;
        return result;
      }

      if (result == markerBytes[0]) {
        input.reset();

        // this might be a marker line
        int markerReadResult =
            input.read(fileMarkerBuffer, 0, fileMarkerBuffer.length);

        input.reset();

        if (markerReadResult < fileMarkerBuffer.length
            || !fileMarker.equals(new String(fileMarkerBuffer))) {
          return input.read();
        }

        return -1;
      }

      return result;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.io.InputStream#read(byte[], int, int)
     * 
     * This does SLIGHTLY THE WRONG THING.
     * 
     * If we run off the end of the segment then the input buffer will be
     * dirtied beyond the point where we claim to have read. If this turns out
     * to be a problem, save that data somewhere and restore it if needed.
     */
    @Override
    public int read(byte[] buffer, int offset, int length) throws IOException {
      if (endSeen) {
        return -1;
      }

      input.mark(length + markerBytes.length + 10);

      int dataSeen = input.read(buffer, offset, length);

      byte[] extraReadBuffer = null;
      int extraActualRead = -1;

      // search for an instance of a file marker
      for (int i = offset; i < offset + dataSeen; ++i) {
        if (buffer[i] == markerBytes[0]) {
          boolean mismatch = false;

          for (int j = 1; j < Math.min(markerBytes.length, offset + dataSeen
              - i); ++j) {
            if (buffer[i + j] != markerBytes[j]) {
              mismatch = true;
              break;
            }
          }

          if (!mismatch) {
            // see if we have only a prefix of the markerBytes
            int uncheckedMarkerCharCount =
                markerBytes.length - (offset + dataSeen - i);

            if (uncheckedMarkerCharCount > 0) {
              if (extraReadBuffer == null) {
                extraReadBuffer = new byte[markerBytes.length - 1];

                extraActualRead = input.read(extraReadBuffer);
              }

              if (extraActualRead < uncheckedMarkerCharCount) {
                input.reset();
                return input.read(buffer, offset, length);
              }

              for (int j = 0; j < uncheckedMarkerCharCount; ++j) {
                if (extraReadBuffer[j] != markerBytes[markerBytes.length
                    - uncheckedMarkerCharCount + j]) {
                  input.reset();
                  return input.read(buffer, offset, length);
                }
              }
            }

            input.reset();

            if (i == offset) {
              return -1;
            }

            int result = input.read(buffer, offset, i - offset);
            return result;
          }
        }
      }

      return dataSeen;
    }

    @Override
    public int read(byte[] buffer) throws IOException {
      return read(buffer, 0, buffer.length);
    }

    @Override
    public void close() throws IOException {
      if (endSeen) {
        input.close();
      }
    }

    String nextFileName() throws IOException {
      return nextFileName(MAXIMUM_HEADER_LINE_LENGTH);
    }

    private String nextFileName(int bufferSize) throws IOException {
      // the line can't contain a newline and must contain a form feed
      byte[] buffer = new byte[bufferSize];

      input.mark(bufferSize + 1);

      int actualRead = input.read(buffer);
      int mostRecentRead = actualRead;

      while (actualRead < bufferSize && mostRecentRead > 0) {
        mostRecentRead =
            input.read(buffer, actualRead, bufferSize - actualRead);

        if (mostRecentRead > 0) {
          actualRead += mostRecentRead;
        }
      }

      if (actualRead < markerBytes.length) {
        input.reset();
        return null;
      }

      for (int i = 0; i < markerBytes.length; ++i) {
        if (markerBytes[i] != buffer[i]) {
          input.reset();
          return null;
        }
      }

      for (int i = markerBytes.length; i < actualRead; ++i) {
        if (buffer[i] == endMarkerBytes[0]) {
          // gather the file name
          input.reset();
          // burn the marker
          if (input.read(buffer, 0, markerBytes.length) < markerBytes.length) {
            throw new IOException("Can't reread bytes I've read before.");
          }
          // get the file name
          if (input.read(buffer, 0, i - markerBytes.length) < i
              - markerBytes.length) {
            throw new IOException("Can't reread bytes I've read before.");
          }
          // burn the two exclamation points and the newline
          if (input.read(fileEndMarkerBuffer) < fileEndMarkerBuffer.length) {
            input.reset();
            return null;
          }
          for (int j = 0; j < endMarkerBytes.length; ++j) {
            if (endMarkerBytes[j] != fileEndMarkerBuffer[j]) {
              input.reset();
              return null;
            }
          }

          return new String(buffer, 0, i - markerBytes.length);
        }

        if (buffer[i] == '\n') {
          return null;
        }
      }

      // we ran off the end. Was the buffer too short, or is this all there was?
      input.reset();

      if (actualRead < bufferSize) {
        return null;
      }

      return nextFileName(bufferSize * 2);
    }
  }

}