ITestReadAndSeekPageBlobAfterWrite.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Random;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.azure.integration.AzureTestUtils .*;

/**
 * Write data into a page blob and verify you can read back all of it
 * or just a part of it.
 */
public class ITestReadAndSeekPageBlobAfterWrite extends AbstractAzureScaleTest {
  private static final Logger LOG =
      LoggerFactory.getLogger(ITestReadAndSeekPageBlobAfterWrite.class);

  private FileSystem fs;
  private byte[] randomData;

  // Page blob physical page size
  private static final int PAGE_SIZE = PageBlobFormatHelpers.PAGE_SIZE;

  // Size of data on page (excluding header)
  private static final int PAGE_DATA_SIZE = PAGE_SIZE - PageBlobFormatHelpers.PAGE_HEADER_SIZE;
  private static final int MAX_BYTES = 33554432; // maximum bytes in a file that we'll test
  private static final int MAX_PAGES = MAX_BYTES / PAGE_SIZE; // maximum number of pages we'll test
  private Random rand = new Random();

  // A key with a prefix under /pageBlobs, which for the test file system will
  // force use of a page blob.
  private static final String KEY = "/pageBlobs/file.dat";

  // path of page blob file to read and write
  private Path blobPath;

  @Override
  public void setUp() throws Exception {
    super.setUp();
    fs = getTestAccount().getFileSystem();
    // Make sure we are using an integral number of pages.
    assertEquals(0, MAX_BYTES % PAGE_SIZE);

    // load an in-memory array of random data
    randomData = new byte[PAGE_SIZE * MAX_PAGES];
    rand.nextBytes(randomData);

    blobPath = blobPath("ITestReadAndSeekPageBlobAfterWrite");
  }

  @Override
  public void tearDown() throws Exception {
    deleteQuietly(fs, blobPath, true);
    super.tearDown();
  }

  /**
   * Make sure the file name (key) is a page blob file name. If anybody changes that,
   * we need to come back and update this test class.
   */
  @Test
  public void testIsPageBlobFileName() {
    AzureNativeFileSystemStore store = ((NativeAzureFileSystem) fs).getStore();
    String[] a = blobPath.toUri().getPath().split("/");
    String key2 = a[1] + "/";
    assertTrue("Not a page blob: " + blobPath, store.isPageBlobKey(key2));
  }

  /**
   * For a set of different file sizes, write some random data to a page blob,
   * read it back, and compare that what was read is the same as what was written.
   */
  @Test
  public void testReadAfterWriteRandomData() throws IOException {

    // local shorthand
    final int pds = PAGE_DATA_SIZE;

    // Test for sizes at and near page boundaries
    int[] dataSizes = {

        // on first page
        0, 1, 2, 3,

        // Near first physical page boundary (because the implementation
        // stores PDS + the page header size bytes on each page).
        pds - 1, pds, pds + 1, pds + 2, pds + 3,

        // near second physical page boundary
        (2 * pds) - 1, (2 * pds), (2 * pds) + 1, (2 * pds) + 2, (2 * pds) + 3,

        // near tenth physical page boundary
        (10 * pds) - 1, (10 * pds), (10 * pds) + 1, (10 * pds) + 2, (10 * pds) + 3,

        // test one big size, >> 4MB (an internal buffer size in the code)
        MAX_BYTES
    };

    for (int i : dataSizes) {
      testReadAfterWriteRandomData(i);
    }
  }

  private void testReadAfterWriteRandomData(int size) throws IOException {
    writeRandomData(size);
    readRandomDataAndVerify(size);
  }

  /**
   * Read "size" bytes of data and verify that what was read and what was written
   * are the same.
   */
  private void readRandomDataAndVerify(int size) throws AzureException, IOException {
    byte[] b = new byte[size];
    FSDataInputStream stream = fs.open(blobPath);
    int bytesRead = stream.read(b);
    stream.close();
    assertEquals(bytesRead, size);

    // compare the data read to the data written
    assertTrue(comparePrefix(randomData, b, size));
  }

  // return true if the beginning "size" values of the arrays are the same
  private boolean comparePrefix(byte[] a, byte[] b, int size) {
    if (a.length < size || b.length < size) {
      return false;
    }
    for (int i = 0; i < size; i++) {
      if (a[i] != b[i]) {
        return false;
      }
    }
    return true;
  }

  // Write a specified amount of random data to the file path for this test class.
  private void writeRandomData(int size) throws IOException {
    OutputStream output = fs.create(blobPath);
    output.write(randomData, 0, size);
    output.close();
  }

  /**
   * Write data to a page blob, open it, seek, and then read a range of data.
   * Then compare that the data read from that range is the same as the data originally written.
   */
  @Test
  public void testPageBlobSeekAndReadAfterWrite() throws IOException {
    writeRandomData(PAGE_SIZE * MAX_PAGES);
    int recordSize = 100;
    byte[] b = new byte[recordSize];


    try(FSDataInputStream stream = fs.open(blobPath)) {
      // Seek to a boundary around the middle of the 6th page
      int seekPosition = 5 * PAGE_SIZE + 250;
      stream.seek(seekPosition);

      // Read a record's worth of bytes and verify results
      int bytesRead = stream.read(b);
      verifyReadRandomData(b, bytesRead, seekPosition, recordSize);

      // Seek to another spot and read a record greater than a page
      seekPosition = 10 * PAGE_SIZE + 250;
      stream.seek(seekPosition);
      recordSize = 1000;
      b = new byte[recordSize];
      bytesRead = stream.read(b);
      verifyReadRandomData(b, bytesRead, seekPosition, recordSize);

      // Read the last 100 bytes of the file
      recordSize = 100;
      seekPosition = PAGE_SIZE * MAX_PAGES - recordSize;
      stream.seek(seekPosition);
      b = new byte[recordSize];
      bytesRead = stream.read(b);
      verifyReadRandomData(b, bytesRead, seekPosition, recordSize);

      // Read past the end of the file and we should get only partial data.
      recordSize = 100;
      seekPosition = PAGE_SIZE * MAX_PAGES - recordSize + 50;
      stream.seek(seekPosition);
      b = new byte[recordSize];
      bytesRead = stream.read(b);
      assertEquals(50, bytesRead);

      // compare last 50 bytes written with those read
      byte[] tail = Arrays.copyOfRange(randomData, seekPosition, randomData.length);
      assertTrue(comparePrefix(tail, b, 50));
    }
  }

  // Verify that reading a record of data after seeking gives the expected data.
  private void verifyReadRandomData(byte[] b, int bytesRead, int seekPosition, int recordSize) {
    byte[] originalRecordData =
        Arrays.copyOfRange(randomData, seekPosition, seekPosition + recordSize + 1);
    assertEquals(recordSize, bytesRead);
    assertTrue(comparePrefix(originalRecordData, b, recordSize));
  }

  // Test many small flushed writes interspersed with periodic hflush calls.
  // For manual testing, increase NUM_WRITES to a large number.
  // The goal for a long-running manual test is to make sure that it finishes
  // and the close() call does not time out. It also facilitates debugging into
  // hflush/hsync.
  @Test
  public void testManySmallWritesWithHFlush() throws IOException {
    writeAndReadOneFile(50, 100, 20);
  }

  /**
   * Write a total of numWrites * recordLength data to a file, read it back,
   * and check to make sure what was read is the same as what was written.
   * The syncInterval is the number of writes after which to call hflush to
   * force the data to storage.
   */
  private void writeAndReadOneFile(int numWrites,
      int recordLength, int syncInterval) throws IOException {

    // A lower bound on the minimum time we think it will take to do
    // a write to Azure storage.
    final long MINIMUM_EXPECTED_TIME = 20;
    LOG.info("Writing " + numWrites * recordLength + " bytes to " + blobPath.getName());
    FSDataOutputStream output = fs.create(blobPath);
    int writesSinceHFlush = 0;
    try {

      // Do a flush and hflush to exercise case for empty write queue in PageBlobOutputStream,
      // to test concurrent execution gates.
      output.flush();
      output.hflush();
      for (int i = 0; i < numWrites; i++) {
        output.write(randomData, i * recordLength, recordLength);
        writesSinceHFlush++;
        output.flush();
        if ((i % syncInterval) == 0) {
          output.hflush();
          writesSinceHFlush = 0;
        }
      }
    } finally {
      long start = Time.monotonicNow();
      output.close();
      long end = Time.monotonicNow();
      LOG.debug("close duration = " + (end - start) + " msec.");
      if (writesSinceHFlush > 0) {
        assertTrue(String.format(
            "close duration with >= 1 pending write is %d, less than minimum expected of %d",
            end - start, MINIMUM_EXPECTED_TIME),
            end - start >= MINIMUM_EXPECTED_TIME);
        }
    }

    // Read the data back and check it.
    FSDataInputStream stream = fs.open(blobPath);
    int SIZE = numWrites * recordLength;
    byte[] b = new byte[SIZE];
    try {
      stream.seek(0);
      stream.read(b, 0, SIZE);
      verifyReadRandomData(b, SIZE, 0, SIZE);
    } finally {
      stream.close();
    }

    // delete the file
    fs.delete(blobPath, false);
  }

  // Test writing to a large file repeatedly as a stress test.
  // Set the repetitions to a larger number for manual testing
  // for a longer stress run.
  @Test
  public void testLargeFileStress() throws IOException {
    int numWrites = 32;
    int recordSize = 1024 * 1024;
    int syncInterval = 10;
    int repetitions = 1;
    for (int i = 0; i < repetitions; i++) {
      writeAndReadOneFile(numWrites, recordSize, syncInterval);
    }
  }
  
  // Write to a file repeatedly to verify that it extends.
  // The page blob file should start out at 128MB and finish at 256MB.
  public void testFileSizeExtension() throws IOException {
    final int writeSize = 1024 * 1024;
    final int numWrites = 129;
    final byte dataByte = 5;
    byte[] data = new byte[writeSize];
    Arrays.fill(data, dataByte);
    try (FSDataOutputStream output = fs.create(blobPath)) {
      for (int i = 0; i < numWrites; i++) {
        output.write(data);
        output.hflush();
        LOG.debug("total writes = " + (i + 1));
      }
    }

    // Show that we wrote more than the default page blob file size.
    assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);

    // Verify we can list the new size. That will prove we expanded the file.
    FileStatus[] status = fs.listStatus(blobPath);
    assertEquals("File size hasn't changed " + status,
        numWrites * writeSize, status[0].getLen());
    LOG.debug("Total bytes written to " + blobPath + " = " + status[0].getLen());
    fs.delete(blobPath, false);
  }

}