AbstractContractStreamIOStatisticsTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatisticsSource;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
/**
* Tests {@link IOStatistics} support in input and output streams.
* <p>
* Requires both the input and output streams to offer the basic
* bytes read/written statistics.
* </p>
* If the IO is buffered, that information must be provided,
* especially the input buffer size.
*/
public abstract class AbstractContractStreamIOStatisticsTest
extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractStreamIOStatisticsTest.class);
/**
* FileSystem statistics are collected across every test case.
*/
protected static final IOStatisticsSnapshot FILESYSTEM_IOSTATS =
snapshotIOStatistics();
@Override
public void teardown() throws Exception {
final FileSystem fs = getFileSystem();
if (fs instanceof IOStatisticsSource) {
FILESYSTEM_IOSTATS.aggregate(((IOStatisticsSource)fs).getIOStatistics());
}
super.teardown();
}
/**
* Dump the filesystem statistics after the class if contains any values.
*/
@AfterClass
public static void dumpFileSystemIOStatistics() {
if (!FILESYSTEM_IOSTATS.counters().isEmpty()) {
// if there is at least one counter
LOG.info("Aggregate FileSystem Statistics {}",
ioStatisticsToPrettyString(FILESYSTEM_IOSTATS));
}
}
@Test
public void testOutputStreamStatisticKeys() throws Throwable {
describe("Look at the statistic keys of an output stream");
Path path = methodPath();
FileSystem fs = getFileSystem();
fs.mkdirs(path.getParent());
try (FSDataOutputStream out = fs.create(path, true)) {
IOStatistics statistics = extractStatistics(out);
final List<String> keys = outputStreamStatisticKeys();
Assertions.assertThat(statistics.counters().keySet())
.describedAs("statistic keys of %s", statistics)
.containsAll(keys);
Assertions.assertThat(keys)
.describedAs("Statistics supported by the stream %s", out)
.contains(STREAM_WRITE_BYTES);
} finally {
fs.delete(path, false);
}
}
/**
* If the stream writes in blocks, then counters during the write may be
* zero until a whole block is written -or the write has finished.
* @return true if writes are buffered into whole blocks.
*/
public boolean streamWritesInBlocks() {
return false;
}
@Test
public void testWriteSingleByte() throws Throwable {
describe("Write a byte to a file and verify"
+ " the stream statistics are updated");
Path path = methodPath();
FileSystem fs = getFileSystem();
fs.mkdirs(path.getParent());
boolean writesInBlocks = streamWritesInBlocks();
try (FSDataOutputStream out = fs.create(path, true)) {
IOStatistics statistics = extractStatistics(out);
// before a write, no bytes
verifyStatisticCounterValue(statistics, STREAM_WRITE_BYTES, 0);
out.write('0');
verifyStatisticCounterValue(statistics, STREAM_WRITE_BYTES,
writesInBlocks ? 0 : 1);
// close the stream
out.close();
// statistics are still valid after the close
// always call the output stream to check that behavior
statistics = extractStatistics(out);
final String strVal = statistics.toString();
LOG.info("Statistics = {}", strVal);
verifyStatisticCounterValue(statistics, STREAM_WRITE_BYTES, 1);
} finally {
fs.delete(path, false);
}
}
@Test
public void testWriteByteArrays() throws Throwable {
describe("Write byte arrays to a file and verify"
+ " the stream statistics are updated");
Path path = methodPath();
FileSystem fs = getFileSystem();
fs.mkdirs(path.getParent());
boolean writesInBlocks = streamWritesInBlocks();
try (FSDataOutputStream out = fs.create(path, true)) {
Object demandStatsString = demandStringifyIOStatisticsSource(out);
// before a write, no bytes
final byte[] bytes = ContractTestUtils.toAsciiByteArray(
"statistically-speaking");
final long len = bytes.length;
out.write(bytes);
out.flush();
LOG.info("stats {}", demandStatsString);
IOStatistics statistics = extractStatistics(out);
verifyStatisticCounterValue(statistics, STREAM_WRITE_BYTES,
writesInBlocks ? 0 : len);
out.write(bytes);
out.flush();
verifyStatisticCounterValue(statistics, STREAM_WRITE_BYTES,
writesInBlocks ? 0 : len * 2);
// close the stream
out.close();
LOG.info("stats {}", demandStatsString);
// statistics are still valid after the close
// always call the output stream to check that behavior
statistics = extractStatistics(out);
verifyStatisticCounterValue(statistics, STREAM_WRITE_BYTES, len * 2);
// the to string value must contain the same counterHiCable you mean
Assertions.assertThat(demandStatsString.toString())
.contains(Long.toString(len * 2));
} finally {
fs.delete(path, false);
}
}
@Test
public void testInputStreamStatisticKeys() throws Throwable {
describe("Look at the statistic keys of an input stream");
Path path = methodPath();
FileSystem fs = getFileSystem();
ContractTestUtils.touch(fs, path);
try (FSDataInputStream in = fs.open(path)) {
IOStatistics statistics = extractStatistics(in);
final List<String> keys = inputStreamStatisticKeys();
Assertions.assertThat(statistics.counters().keySet())
.describedAs("statistic keys of %s", statistics)
.containsAll(keys);
Assertions.assertThat(keys)
.describedAs("Statistics supported by the stream %s", in)
.contains(STREAM_READ_BYTES);
verifyStatisticCounterValue(statistics, STREAM_READ_BYTES, 0);
} finally {
fs.delete(path, false);
}
}
@Test
public void testInputStreamStatisticRead() throws Throwable {
describe("Read Data from an input stream");
Path path = methodPath();
FileSystem fs = getFileSystem();
final int fileLen = 1024;
final byte[] ds = dataset(fileLen, 'a', 26);
ContractTestUtils.writeDataset(fs, path, ds, fileLen, 8_000, true);
try (FSDataInputStream in = fs.open(path)) {
long current = 0;
IOStatistics statistics = extractStatistics(in);
verifyStatisticCounterValue(statistics, STREAM_READ_BYTES, 0);
Assertions.assertThat(in.read()).isEqualTo('a');
int bufferSize = readBufferSize();
// either a single byte was read or a whole block
current = verifyBytesRead(statistics, current, 1, bufferSize);
final int bufferLen = 128;
byte[] buf128 = new byte[bufferLen];
in.read(buf128);
current = verifyBytesRead(statistics, current, bufferLen, bufferSize);
in.readFully(buf128);
current = verifyBytesRead(statistics, current, bufferLen, bufferSize);
in.readFully(0, buf128);
current = verifyBytesRead(statistics, current, bufferLen, bufferSize);
// seek must not increment the read counter
in.seek(256);
verifyBytesRead(statistics, current, 0, bufferSize);
// if a stream implements lazy-seek the seek operation
// may be postponed until the read
final int sublen = 32;
Assertions.assertThat(in.read(buf128, 0, sublen))
.isEqualTo(sublen);
current = verifyBytesRead(statistics, current, sublen, bufferSize);
// perform some read operations near the end of the file such that
// the buffer will not be completely read.
// skip these tests for buffered IO as it is too complex to work out
if (bufferSize == 0) {
final int pos = fileLen - sublen;
in.seek(pos);
Assertions.assertThat(in.read(buf128))
.describedAs("Read overlapping EOF")
.isEqualTo(sublen);
current = verifyStatisticCounterValue(statistics, STREAM_READ_BYTES,
current + sublen);
Assertions.assertThat(in.read(pos, buf128, 0, bufferLen))
.describedAs("Read(buffer) overlapping EOF")
.isEqualTo(sublen);
verifyStatisticCounterValue(statistics, STREAM_READ_BYTES,
current + sublen);
}
} finally {
fs.delete(path, false);
}
}
/**
* Verify the bytes read value, taking into account block size.
* @param statistics stats
* @param current current count
* @param bytesRead bytes explicitly read
* @param bufferSize buffer size of stream
* @return the current count of bytes read <i>ignoring block size</i>
*/
public long verifyBytesRead(final IOStatistics statistics,
final long current,
final int bytesRead, final int bufferSize) {
// final position. for unbuffered read, this is the expected value
long finalPos = current + bytesRead;
long expected = finalPos;
if (bufferSize > 0) {
// buffered. count of read is number of buffers already read
// plus the current buffer, multiplied by that buffer size
expected = bufferSize * (1 + (current / bufferSize));
}
verifyStatisticCounterValue(statistics, STREAM_READ_BYTES, expected);
return finalPos;
}
/**
* Buffer size for reads.
* Filesystems performing block reads (checksum, etc)
* must return their buffer value is
* @return buffer capacity; 0 for unbuffered
*/
public int readBufferSize() {
return 0;
}
/**
* Keys which the output stream must support.
* @return a list of keys
*/
public List<String> outputStreamStatisticKeys() {
return Collections.singletonList(STREAM_WRITE_BYTES);
}
/**
* Keys which the input stream must support.
* @return a list of keys
*/
public List<String> inputStreamStatisticKeys() {
return Collections.singletonList(STREAM_READ_BYTES);
}
}