DataWriter.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.slive;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Random;

import static org.apache.hadoop.fs.slive.Constants.BYTES_PER_LONG;

/**
 * Class which handles generating data (creating and appending) along with
 * ensuring the correct data is written out for the given path name so that it
 * can be later verified
 */
class DataWriter {
  /**
   * Header size in bytes
   */
  private static final int HEADER_LENGTH = (BYTES_PER_LONG * 2);

  private int bufferSize;
  private Random rnd;

  /**
   * Class used to hold the number of bytes written and time taken for write
   * operations for callers to use
   */
  static class GenerateOutput {

    private long bytes;
    private long time;

    GenerateOutput(long bytesWritten, long timeTaken) {
      this.bytes = bytesWritten;
      this.time = timeTaken;
    }

    long getBytesWritten() {
      return bytes;
    }

    long getTimeTaken() {
      return time;
    }

    public String toString() {
      return "Wrote " + getBytesWritten() + " bytes " + " which took "
          + getTimeTaken() + " milliseconds";
    }
  }

  /**
   * Class used to hold a byte buffer and offset position for generating data
   */
  private static class GenerateResult {
    private long offset;
    private ByteBuffer buffer;

    GenerateResult(long offset, ByteBuffer buffer) {
      this.offset = offset;
      this.buffer = buffer;
    }

    long getOffset() {
      return offset;
    }

    ByteBuffer getBuffer() {
      return buffer;
    }
  }

  /**
   * What a header write output returns need the hash value to use and the time
   * taken to perform the write + bytes written
   */
  private static class WriteInfo {
    private long hashValue;
    private long bytesWritten;
    private long timeTaken;

    WriteInfo(long hashValue, long bytesWritten, long timeTaken) {
      this.hashValue = hashValue;
      this.bytesWritten = bytesWritten;
      this.timeTaken = timeTaken;
    }

    long getHashValue() {
      return hashValue;
    }

    long getTimeTaken() {
      return timeTaken;
    }

    long getBytesWritten() {
      return bytesWritten;
    }
  }

  /**
   * Inits with given buffer size (must be greater than bytes per long and a
   * multiple of bytes per long)
   * 
   * @param rnd
   *          random number generator to use for hash value creation
   * 
   * @param bufferSize
   *          size which must be greater than BYTES_PER_LONG and which also must
   *          be a multiple of BYTES_PER_LONG
   */
  DataWriter(Random rnd, int bufferSize) {
    if (bufferSize < BYTES_PER_LONG) {
      throw new IllegalArgumentException(
          "Buffer size must be greater than or equal to " + BYTES_PER_LONG);
    }
    if ((bufferSize % BYTES_PER_LONG) != 0) {
      throw new IllegalArgumentException("Buffer size must be a multiple of "
          + BYTES_PER_LONG);
    }
    this.bufferSize = bufferSize;
    this.rnd = rnd;
  }

  /**
   * Inits with default buffer size
   */
  DataWriter(Random rnd) {
    this(rnd, Constants.BUFFERSIZE);
  }

  /**
   * Generates a partial segment which is less than bytes per long size
   * 
   * @param byteAm
   *          the number of bytes to generate (less than bytes per long)
   * @param offset
   *          the staring offset
   * @param hasher
   *          hasher to use for generating data given an offset
   * 
   * @return GenerateResult containing new offset and byte buffer
   */
  private GenerateResult generatePartialSegment(int byteAm, long offset,
      DataHasher hasher) {
    if (byteAm > BYTES_PER_LONG) {
      throw new IllegalArgumentException(
          "Partial bytes must be less or equal to " + BYTES_PER_LONG);
    }
    if (byteAm <= 0) {
      throw new IllegalArgumentException(
          "Partial bytes must be greater than zero and not " + byteAm);
    }
    ByteBuffer buf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
    buf.putLong(hasher.generate(offset));
    ByteBuffer allBytes = ByteBuffer.wrap(new byte[byteAm]);
    buf.rewind();
    for (int i = 0; i < byteAm; ++i) {
      allBytes.put(buf.get());
    }
    allBytes.rewind();
    return new GenerateResult(offset, allBytes);
  }

  /**
   * Generates a full segment (aligned to bytes per long) of the given byte
   * amount size
   * 
   * @param byteAm
   *          long aligned size
   * @param startOffset
   *          starting hash offset
   * @param hasher
   *          hasher to use for generating data given an offset
   * @return GenerateResult containing new offset and byte buffer
   */
  private GenerateResult generateFullSegment(int byteAm, long startOffset,
      DataHasher hasher) {
    if (byteAm <= 0) {
      throw new IllegalArgumentException(
          "Byte amount must be greater than zero and not " + byteAm);
    }
    if ((byteAm % BYTES_PER_LONG) != 0) {
      throw new IllegalArgumentException("Byte amount " + byteAm
          + " must be a multiple of " + BYTES_PER_LONG);
    }
    // generate all the segments
    ByteBuffer allBytes = ByteBuffer.wrap(new byte[byteAm]);
    long offset = startOffset;
    ByteBuffer buf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
    for (long i = 0; i < byteAm; i += BYTES_PER_LONG) {
      buf.rewind();
      buf.putLong(hasher.generate(offset));
      buf.rewind();
      allBytes.put(buf);
      offset += BYTES_PER_LONG;
    }
    allBytes.rewind();
    return new GenerateResult(offset, allBytes);
  }

  /**
   * Writes a set of bytes to the output stream, for full segments it will write
   * out the complete segment but for partial segments, ie when the last
   * position does not fill up a full long then a partial set will be written
   * out containing the needed bytes from the expected full segment
   * 
   * @param byteAm
   *          the amount of bytes to write
   * @param startPos
   *          a BYTES_PER_LONG aligned start position
   * @param hasher
   *          hasher to use for generating data given an offset
   * @param out
   *          the output stream to write to
   * @return how many bytes were written
   * @throws IOException
   */
  private GenerateOutput writePieces(long byteAm, long startPos,
      DataHasher hasher, OutputStream out) throws IOException {
    if (byteAm <= 0) {
      return new GenerateOutput(0, 0);
    }
    if (startPos < 0) {
      startPos = 0;
    }
    int leftOver = (int) (byteAm % bufferSize);
    long fullPieces = byteAm / bufferSize;
    long offset = startPos;
    long bytesWritten = 0;
    long timeTaken = 0;
    // write the full pieces that fit in the buffer size
    for (long i = 0; i < fullPieces; ++i) {
      GenerateResult genData = generateFullSegment(bufferSize, offset, hasher);
      offset = genData.getOffset();
      ByteBuffer gBuf = genData.getBuffer();
      {
        byte[] buf = gBuf.array();
        long startTime = Timer.now();
        out.write(buf);
        if (Constants.FLUSH_WRITES) {
          out.flush();
        }
        timeTaken += Timer.elapsed(startTime);
        bytesWritten += buf.length;
      }
    }
    if (leftOver > 0) {
      ByteBuffer leftOverBuf = ByteBuffer.wrap(new byte[leftOver]);
      int bytesLeft = leftOver % BYTES_PER_LONG;
      leftOver = leftOver - bytesLeft;
      // collect the piece which do not fit in the buffer size but is
      // also greater or eq than BYTES_PER_LONG and a multiple of it
      if (leftOver > 0) {
        GenerateResult genData = generateFullSegment(leftOver, offset, hasher);
        offset = genData.getOffset();
        leftOverBuf.put(genData.getBuffer());
      }
      // collect any single partial byte segment
      if (bytesLeft > 0) {
        GenerateResult genData = generatePartialSegment(bytesLeft, offset,
            hasher);
        offset = genData.getOffset();
        leftOverBuf.put(genData.getBuffer());
      }
      // do the write of both
      leftOverBuf.rewind();
      {
        byte[] buf = leftOverBuf.array();
        long startTime = Timer.now();
        out.write(buf);
        if (Constants.FLUSH_WRITES) {
          out.flush();
        }
        timeTaken += Timer.elapsed(startTime);
        bytesWritten += buf.length;
      }
    }
    return new GenerateOutput(bytesWritten, timeTaken);
  }

  /**
   * Writes to a stream the given number of bytes specified
   * 
   * @param byteAm
   *          the file size in number of bytes to write
   * 
   * @param out
   *          the outputstream to write to
   * 
   * @return the number of bytes written + time taken
   * 
   * @throws IOException
   */
  GenerateOutput writeSegment(long byteAm, OutputStream out)
      throws IOException {
    long headerLen = getHeaderLength();
    if (byteAm < headerLen) {
      // not enough bytes to write even the header
      return new GenerateOutput(0, 0);
    }
    // adjust for header length
    byteAm -= headerLen;
    if (byteAm < 0) {
      byteAm = 0;
    }
    WriteInfo header = writeHeader(out, byteAm);
    DataHasher hasher = new DataHasher(header.getHashValue());
    GenerateOutput pRes = writePieces(byteAm, 0, hasher, out);
    long bytesWritten = pRes.getBytesWritten() + header.getBytesWritten();
    long timeTaken = header.getTimeTaken() + pRes.getTimeTaken();
    return new GenerateOutput(bytesWritten, timeTaken);
  }

  /**
   * Gets the header length
   * 
   * @return int
   */
  static int getHeaderLength() {
    return HEADER_LENGTH;
  }

  /**
   * Writes a header to the given output stream
   * 
   * @param os
   *          output stream to write to
   * 
   * @param fileSize
   *          the file size to write
   * 
   * @return WriteInfo
   * 
   * @throws IOException
   *           if a write failure occurs
   */
  WriteInfo writeHeader(OutputStream os, long fileSize) throws IOException {
    int headerLen = getHeaderLength();
    ByteBuffer buf = ByteBuffer.wrap(new byte[headerLen]);
    long hash = rnd.nextLong();
    buf.putLong(hash);
    buf.putLong(fileSize);
    buf.rewind();
    byte[] headerData = buf.array();
    long elapsed = 0;
    {
      long startTime = Timer.now();
      os.write(headerData);
      if (Constants.FLUSH_WRITES) {
        os.flush();
      }
      elapsed += Timer.elapsed(startTime);
    }
    return new WriteInfo(hash, headerLen, elapsed);
  }
}