DataVerifier.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.slive;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
 * Class which reads in and verifies bytes that have been read in
 */
class DataVerifier {
  private static final int BYTES_PER_LONG = Constants.BYTES_PER_LONG;

  private int bufferSize;

  /**
   * The output from verification includes the number of chunks that were the
   * same as expected and the number of segments that were different than what
   * was expected and the number of total bytes read
   */
  static class VerifyOutput {
    private long same;
    private long different;
    private long read;
    private long readTime;

    VerifyOutput(long sameChunks, long differentChunks, long readBytes,
        long readTime) {
      this.same = sameChunks;
      this.different = differentChunks;
      this.read = readBytes;
      this.readTime = readTime;
    }

    long getReadTime() {
      return this.readTime;
    }

    long getBytesRead() {
      return this.read;
    }

    long getChunksSame() {
      return same;
    }

    long getChunksDifferent() {
      return different;
    }

    public String toString() {
      return "Bytes read = " + getBytesRead() + " same = " + getChunksSame()
          + " different = " + getChunksDifferent() + " in " + getReadTime()
          + " milliseconds";
    }

  }

  /**
   * Class used to hold the result of a read on a header
   */
  private static class ReadInfo {
    private long byteAm;
    private long hash;
    private long timeTaken;
    private long bytesRead;

    ReadInfo(long byteAm, long hash, long timeTaken, long bytesRead) {
      this.byteAm = byteAm;
      this.hash = hash;
      this.timeTaken = timeTaken;
      this.bytesRead = bytesRead;
    }

    long getByteAm() {
      return byteAm;
    }

    long getHashValue() {
      return hash;
    }

    long getTimeTaken() {
      return timeTaken;
    }

    long getBytesRead() {
      return bytesRead;
    }

  }

  /**
   * Storage class used to hold the chunks same and different for buffered reads
   * and the resultant verification
   */
  private static class VerifyInfo {

    VerifyInfo(long same, long different) {
      this.same = same;
      this.different = different;
    }

    long getSame() {
      return same;
    }

    long getDifferent() {
      return different;
    }

    private long same;
    private long different;
  }

  /**
   * Inits with given buffer size (must be greater than bytes per long and a
   * multiple of bytes per long)
   * 
   * @param bufferSize
   *          size which must be greater than BYTES_PER_LONG and which also must
   *          be a multiple of BYTES_PER_LONG
   */
  DataVerifier(int bufferSize) {
    if (bufferSize < BYTES_PER_LONG) {
      throw new IllegalArgumentException(
          "Buffer size must be greater than or equal to " + BYTES_PER_LONG);
    }
    if ((bufferSize % BYTES_PER_LONG) != 0) {
      throw new IllegalArgumentException("Buffer size must be a multiple of "
          + BYTES_PER_LONG);
    }
    this.bufferSize = bufferSize;
  }

  /**
   * Inits with the default buffer size
   */
  DataVerifier() {
    this(Constants.BUFFERSIZE);
  }

  /**
   * Verifies a buffer of a given size using the given start hash offset
   * 
   * @param buf
   *          the buffer to verify
   * @param size
   *          the number of bytes to be used in that buffer
   * @param startOffset
   *          the start hash offset
   * @param hasher
   *          the hasher to use for calculating expected values
   * 
   * @return ResumeBytes a set of data about the next offset and chunks analyzed
   */
  private VerifyInfo verifyBuffer(ByteBuffer buf, int size, long startOffset,
      DataHasher hasher) {
    ByteBuffer cmpBuf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
    long hashOffset = startOffset;
    long chunksSame = 0;
    long chunksDifferent = 0;
    for (long i = 0; i < size; ++i) {
      cmpBuf.put(buf.get());
      if (!cmpBuf.hasRemaining()) {
        cmpBuf.rewind();
        long receivedData = cmpBuf.getLong();
        cmpBuf.rewind();
        long expected = hasher.generate(hashOffset);
        hashOffset += BYTES_PER_LONG;
        if (receivedData == expected) {
          ++chunksSame;
        } else {
          ++chunksDifferent;
        }
      }
    }
    // any left over??
    if (cmpBuf.hasRemaining() && cmpBuf.position() != 0) {
      // partial capture
      // zero fill and compare with zero filled
      int curSize = cmpBuf.position();
      while (cmpBuf.hasRemaining()) {
        cmpBuf.put((byte) 0);
      }
      long expected = hasher.generate(hashOffset);
      ByteBuffer tempBuf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
      tempBuf.putLong(expected);
      tempBuf.position(curSize);
      while (tempBuf.hasRemaining()) {
        tempBuf.put((byte) 0);
      }
      cmpBuf.rewind();
      tempBuf.rewind();
      if (cmpBuf.equals(tempBuf)) {
        ++chunksSame;
      } else {
        ++chunksDifferent;
      }
    }
    return new VerifyInfo(chunksSame, chunksDifferent);
  }

  /**
   * Determines the offset to use given a byte counter
   * 
   * @param byteRead
   * 
   * @return offset position
   */
  private long determineOffset(long byteRead) {
    if (byteRead < 0) {
      byteRead = 0;
    }
    return (byteRead / BYTES_PER_LONG) * BYTES_PER_LONG;
  }

  /**
   * Verifies a given number of bytes from a file - less number of bytes may be
   * read if a header can not be read in due to the byte limit
   * 
   * @param byteAm
   *          the byte amount to limit to (should be less than or equal to file
   *          size)
   * 
   * @param in
   *          the input stream to read from
   * 
   * @return VerifyOutput with data about reads
   * 
   * @throws IOException
   *           if a read failure occurs
   * 
   * @throws BadFileException
   *           if a header can not be read or end of file is reached
   *           unexpectedly
   */
  VerifyOutput verifyFile(long byteAm, DataInputStream in)
      throws IOException, BadFileException {
    return verifyBytes(byteAm, 0, in);
  }

  /**
   * Verifies a given number of bytes from a file - less number of bytes may be
   * read if a header can not be read in due to the byte limit
   * 
   * @param byteAm
   *          the byte amount to limit to (should be less than or equal to file
   *          size)
   * 
   * @param bytesRead
   *          the starting byte location
   * 
   * @param in
   *          the input stream to read from
   * 
   * @return VerifyOutput with data about reads
   * 
   * @throws IOException
   *           if a read failure occurs
   * 
   * @throws BadFileException
   *           if a header can not be read or end of file is reached
   *           unexpectedly
   */
  private VerifyOutput verifyBytes(long byteAm, long bytesRead,
      DataInputStream in) throws IOException, BadFileException {
    if (byteAm <= 0) {
      return new VerifyOutput(0, 0, 0, 0);
    }
    long chunksSame = 0;
    long chunksDifferent = 0;
    long readTime = 0;
    long bytesLeft = byteAm;
    long bufLeft = 0;
    long bufRead = 0;
    long seqNum = 0;
    DataHasher hasher = null;
    ByteBuffer readBuf = ByteBuffer.wrap(new byte[bufferSize]);
    while (bytesLeft > 0) {
      if (bufLeft <= 0) {
        if (bytesLeft < DataWriter.getHeaderLength()) {
          // no bytes left to read a header
          break;
        }
        // time to read a new header
        ReadInfo header = null;
        try {
          header = readHeader(in);
        } catch (EOFException e) {
          // eof ok on header reads
          // but not on data readers
          break;
        }
        ++seqNum;
        hasher = new DataHasher(header.getHashValue());
        bufLeft = header.getByteAm();
        readTime += header.getTimeTaken();
        bytesRead += header.getBytesRead();
        bytesLeft -= header.getBytesRead();
        bufRead = 0;
        // number of bytes to read greater than how many we want to read
        if (bufLeft > bytesLeft) {
          bufLeft = bytesLeft;
        }
        // does the buffer amount have anything??
        if (bufLeft <= 0) {
          continue;
        }
      }
      // figure out the buffer size to read
      int bufSize = bufferSize;
      if (bytesLeft < bufSize) {
        bufSize = (int) bytesLeft;
      }
      if (bufLeft < bufSize) {
        bufSize = (int) bufLeft;
      }
      // read it in
      try {
        readBuf.rewind();
        long startTime = Timer.now();
        in.readFully(readBuf.array(), 0, bufSize);
        readTime += Timer.elapsed(startTime);
      } catch (EOFException e) {
        throw new BadFileException(
            "Could not read the number of expected data bytes " + bufSize
                + " due to unexpected end of file during sequence " + seqNum, e);
      }
      // update the counters
      bytesRead += bufSize;
      bytesLeft -= bufSize;
      bufLeft -= bufSize;
      // verify what we read
      readBuf.rewind();
      // figure out the expected hash offset start point
      long vOffset = determineOffset(bufRead);
      // now update for new position
      bufRead += bufSize;
      // verify
      VerifyInfo verifyRes = verifyBuffer(readBuf, bufSize, vOffset, hasher);
      // update the verification counters
      chunksSame += verifyRes.getSame();
      chunksDifferent += verifyRes.getDifferent();
    }
    return new VerifyOutput(chunksSame, chunksDifferent, bytesRead, readTime);
  }


  /**
   * Reads a header from the given input stream
   * 
   * @param in
   *          input stream to read from
   * 
   * @return ReadInfo
   * 
   * @throws IOException
   *           if a read error occurs or EOF occurs
   * 
   * @throws BadFileException
   *           if end of file occurs or the byte amount read is invalid
   */
  ReadInfo readHeader(DataInputStream in) throws IOException,
      BadFileException {
    int headerLen = DataWriter.getHeaderLength();
    ByteBuffer headerBuf = ByteBuffer.wrap(new byte[headerLen]);
    long elapsed = 0;
    {
      long startTime = Timer.now();
      in.readFully(headerBuf.array());
      elapsed += Timer.elapsed(startTime);
    }
    headerBuf.rewind();
    long hashValue = headerBuf.getLong();
    long byteAvailable = headerBuf.getLong();
    if (byteAvailable < 0) {
      throw new BadFileException("Invalid negative amount " + byteAvailable
          + " determined for header data amount");
    }
    return new ReadInfo(byteAvailable, hashValue, elapsed, headerLen);
  }
}