ITestAzureBlobFileSystemE2EScale.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.Test;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 * Test end to end between ABFS client and ABFS server with heavy traffic.
 */
public class ITestAzureBlobFileSystemE2EScale extends
    AbstractAbfsScaleTest {
  private static final int TEN = 10;
  private static final int ONE_THOUSAND = 1000;
  private static final int BASE_SIZE = 1024;
  private static final int ONE_MB = 1024 * 1024;
  private static final int DEFAULT_WRITE_TIMES = 100;

  public ITestAzureBlobFileSystemE2EScale() throws Exception {
  }

  @Test
  public void testWriteHeavyBytesToFileAcrossThreads() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    final Path testFile = path(methodName.getMethodName());
    final FSDataOutputStream stream = fs.create(testFile);
    ExecutorService es = Executors.newFixedThreadPool(TEN);

    int testWriteBufferSize = 2 * TEN * ONE_THOUSAND * BASE_SIZE;
    final byte[] b = new byte[testWriteBufferSize];
    new Random().nextBytes(b);
    List<Future<Void>> tasks = new ArrayList<>();

    int operationCount = DEFAULT_WRITE_TIMES;
    for (int i = 0; i < operationCount; i++) {
      Callable<Void> callable = new Callable<Void>() {
        @Override
        public Void call() throws Exception {
          stream.write(b);
          return null;
        }
      };

      tasks.add(es.submit(callable));
    }

    for (Future<Void> task : tasks) {
      task.get();
    }

    tasks.clear();
    stream.close();

    es.shutdownNow();
    FileStatus fileStatus = fs.getFileStatus(testFile);
    assertEquals(testWriteBufferSize * operationCount, fileStatus.getLen());
  }

  @Test
  public void testReadWriteHeavyBytesToFileWithStatistics() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    final FileSystem.Statistics abfsStatistics;
    final Path testFile = path(methodName.getMethodName());
    int testBufferSize;
    final byte[] sourceData;
    try (FSDataOutputStream stream = fs.create(testFile)) {
      abfsStatistics = fs.getFsStatistics();
      abfsStatistics.reset();

      testBufferSize = 5 * TEN * ONE_THOUSAND * BASE_SIZE;
      sourceData = new byte[testBufferSize];
      new Random().nextBytes(sourceData);
      stream.write(sourceData);
    }

    final byte[] remoteData = new byte[testBufferSize];
    int bytesRead;
    try (FSDataInputStream inputStream = fs.open(testFile, 4 * ONE_MB)) {
      bytesRead = inputStream.read(remoteData);
    }

    String stats = abfsStatistics.toString();
    assertEquals("Bytes read in " + stats,
        remoteData.length, abfsStatistics.getBytesRead());
    assertEquals("bytes written in " + stats,
        sourceData.length, abfsStatistics.getBytesWritten());
    assertEquals("bytesRead from read() call", testBufferSize, bytesRead);
    assertArrayEquals("round tripped data", sourceData, remoteData);

  }
}