ITestAbfsStreamStatistics.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* Test Abfs Stream.
*/
public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
public ITestAbfsStreamStatistics() throws Exception {
}
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
private static final int LARGE_NUMBER_OF_OPS = 99;
/***
* Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and
* {@code incrementWriteOps()} in class {@code AbfsOutputStream}.
*
*/
@Test
public void testAbfsStreamOps() throws Exception {
describe("Test to see correct population of read and write operations in "
+ "Abfs");
final AzureBlobFileSystem fs = getFileSystem();
Path smallOperationsFile = path("testOneReadWriteOps");
Path largeOperationsFile = path("testLargeReadWriteOps");
FileSystem.Statistics statistics = fs.getFsStatistics();
String testReadWriteOps = "test this";
statistics.reset();
//Test for zero write operation
assertReadWriteOps("write", 0, statistics.getWriteOps());
//Test for zero read operation
assertReadWriteOps("read", 0, statistics.getReadOps());
FSDataOutputStream outForOneOperation = null;
FSDataInputStream inForOneOperation = null;
try {
outForOneOperation = fs.create(smallOperationsFile);
statistics.reset();
outForOneOperation.write(testReadWriteOps.getBytes());
//Test for a single write operation
assertReadWriteOps("write", 1, statistics.getWriteOps());
//Flushing output stream to see content to read
outForOneOperation.hflush();
inForOneOperation = fs.open(smallOperationsFile);
statistics.reset();
int result = inForOneOperation.read(testReadWriteOps.getBytes(), 0,
testReadWriteOps.getBytes().length);
LOG.info("Result of Read operation : {}", result);
/*
* Testing if 2 read_ops value is coming after reading full content
* from a file (3 if anything to read from Buffer too). Reason: read()
* call gives read_ops=1, reading from AbfsClient(http GET) gives
* read_ops=2.
*
* In some cases ABFS-prefetch thread runs in the background which
* returns some bytes from buffer and gives an extra readOp.
* Thus, making readOps values arbitrary and giving intermittent
* failures in some cases. Hence, readOps values of 2 or 3 is seen in
* different setups.
*
*/
assertTrue(String.format("The actual value of %d was not equal to the "
+ "expected value of 2 or 3", statistics.getReadOps()),
statistics.getReadOps() == 2 || statistics.getReadOps() == 3);
} finally {
IOUtils.cleanupWithLogger(LOG, inForOneOperation,
outForOneOperation);
}
//Validating if content is being written in the smallOperationsFile
assertTrue("Mismatch in content validation",
validateContent(fs, smallOperationsFile,
testReadWriteOps.getBytes()));
FSDataOutputStream outForLargeOperations = null;
FSDataInputStream inForLargeOperations = null;
StringBuilder largeOperationsValidationString = new StringBuilder();
try {
outForLargeOperations = fs.create(largeOperationsFile);
statistics.reset();
int largeValue = LARGE_NUMBER_OF_OPS;
for (int i = 0; i < largeValue; i++) {
outForLargeOperations.write(testReadWriteOps.getBytes());
//Creating the String for content Validation
largeOperationsValidationString.append(testReadWriteOps);
}
LOG.info("Number of bytes of Large data written: {}",
largeOperationsValidationString.toString().getBytes().length);
//Test for 1000000 write operations
assertReadWriteOps("write", largeValue, statistics.getWriteOps());
inForLargeOperations = fs.open(largeOperationsFile);
for (int i = 0; i < largeValue; i++) {
inForLargeOperations
.read(testReadWriteOps.getBytes(), 0,
testReadWriteOps.getBytes().length);
}
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(largeOperationsFile).toString())) {
// for appendblob data is already flushed, so there might be more data to read.
assertTrue(String.format("The actual value of %d was not equal to the "
+ "expected value", statistics.getReadOps()),
statistics.getReadOps() >= largeValue || statistics.getReadOps() <= (largeValue + 4));
} else {
//Test for 1000000 read operations
assertReadWriteOps("read", largeValue, statistics.getReadOps());
}
} finally {
IOUtils.cleanupWithLogger(LOG, inForLargeOperations,
outForLargeOperations);
}
//Validating if content is being written in largeOperationsFile
assertTrue("Mismatch in content validation",
validateContent(fs, largeOperationsFile,
largeOperationsValidationString.toString().getBytes()));
}
/**
* Generic method to assert both Read an write operations.
*
* @param operation what operation is being asserted
* @param expectedValue value which is expected
* @param actualValue value which is actual
*/
private void assertReadWriteOps(String operation, long expectedValue,
long actualValue) {
assertEquals("Mismatch in " + operation + " operations", expectedValue,
actualValue);
}
}