TestDataBlocks.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import org.assertj.core.data.Index;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.store.ByteBufferInputStream;
import org.apache.hadoop.test.HadoopTestBase;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static java.util.Optional.empty;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Unit tests for {@link S3ADataBlocks}.
* Parameterized on the buffer type.
*/
public class TestDataBlocks extends HadoopTestBase {
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{FAST_UPLOAD_BUFFER_DISK},
{FAST_UPLOAD_BUFFER_ARRAY},
{FAST_UPLOAD_BYTEBUFFER}
});
}
@TempDir
private Path tempDir;
/**
* Buffer type.
*/
private String bufferType;
public void initTestDataBlocks(final String pBufferType) {
this.bufferType = pBufferType;
}
/**
* Create a block factory.
* @return the factory
*/
private S3ADataBlocks.BlockFactory createFactory() {
switch (bufferType) {
// this one passed in a file allocation function
case FAST_UPLOAD_BUFFER_DISK:
return new S3ADataBlocks.DiskBlockFactory((i, l) ->
tempDir.resolve("file" + i).toFile());
case FAST_UPLOAD_BUFFER_ARRAY:
return new S3ADataBlocks.ArrayBlockFactory(null);
case FAST_UPLOAD_BYTEBUFFER:
return new S3ADataBlocks.ByteBufferBlockFactory(null);
default:
throw new IllegalArgumentException("Unknown buffer type: " + bufferType);
}
}
/**
* Test the content providers from the block factory and the streams
* they produce.
* There are extra assertions on the {@link ByteBufferInputStream}.
*/
@ParameterizedTest(name = "BufferType : {0}")
@MethodSource("params")
public void testBlockFactoryIO(String pBufferType) throws Throwable {
initTestDataBlocks(pBufferType);
try (S3ADataBlocks.BlockFactory factory = createFactory()) {
int limit = 128;
S3ADataBlocks.DataBlock block
= factory.create(1, limit, null);
maybeAssertOutstandingBuffers(factory, 1);
byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
int bufferLen = buffer.length;
block.write(buffer, 0, bufferLen);
assertEquals(bufferLen, block.dataSize());
assertEquals(limit - bufferLen, block.remainingCapacity(),
"capacity in " + block);
assertTrue(block.hasCapacity(64), "hasCapacity(64) in " + block);
assertTrue(block.hasCapacity(limit - bufferLen),
"No capacity in " + block);
// now start the write
S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload();
final UploadContentProviders.BaseContentProvider<?> cp =
blockUploadData.getContentProvider();
assertStreamCreationCount(cp, 0);
InputStream stream = cp.newStream();
assertStreamCreationCount(cp, 1);
assertThat(stream.markSupported())
.describedAs("markSupported() of %s", stream)
.isTrue();
Optional<ByteBufferInputStream> bbStream =
stream instanceof ByteBufferInputStream
? Optional.of((ByteBufferInputStream) stream)
: empty();
bbStream.ifPresent(bb -> {
assertThat(bb.hasRemaining())
.describedAs("hasRemaining() in %s", bb)
.isTrue();
});
int expected = bufferLen;
assertAvailableValue(stream, expected);
assertReadEquals(stream, 't');
stream.mark(Integer.MAX_VALUE);
expected--;
assertAvailableValue(stream, expected);
// read into a byte array with an offset
int offset = 5;
byte[] in = new byte[limit];
assertEquals(2, stream.read(in, offset, 2));
assertByteAtIndex(in, offset++, 'e');
assertByteAtIndex(in, offset++, 's');
expected -= 2;
assertAvailableValue(stream, expected);
// read to end
byte[] remainder = new byte[limit];
int c;
int index = 0;
while ((c = stream.read()) >= 0) {
remainder[index++] = (byte) c;
}
assertEquals(expected, index);
assertByteAtIndex(remainder, --index, 'a');
// no more data left
assertAvailableValue(stream, 0);
bbStream.ifPresent(bb -> {
assertThat(bb.hasRemaining())
.describedAs("hasRemaining() in %s", bb)
.isFalse();
});
// at the end of the stream, a read fails
assertReadEquals(stream, -1);
// go the mark point
stream.reset();
assertAvailableValue(stream, bufferLen - 1);
assertReadEquals(stream, 'e');
// now ask the content provider for another content stream.
final InputStream stream2 = cp.newStream();
assertStreamCreationCount(cp, 2);
// this must close the old stream
bbStream.ifPresent(bb -> {
assertThat(bb.isOpen())
.describedAs("stream %s is open", bb)
.isFalse();
});
// do a read(byte[]) of everything
byte[] readBuffer = new byte[bufferLen];
assertThat(stream2.read(readBuffer))
.describedAs("number of bytes read from stream %s", stream2)
.isEqualTo(bufferLen);
assertThat(readBuffer)
.describedAs("data read into buffer")
.isEqualTo(buffer);
// when the block is closed, the buffer must be returned
// to the pool.
block.close();
maybeAssertOutstandingBuffers(factory, 0);
stream.close();
maybeAssertOutstandingBuffers(factory, 0);
// now the block is closed, the content provider must fail to
// create a new stream
intercept(IllegalStateException.class, cp::newStream);
}
}
private static void assertByteAtIndex(final byte[] bytes,
final int index, final char expected) {
assertThat(bytes)
.contains(expected, Index.atIndex(index));
}
private static void assertReadEquals(final InputStream stream,
final int ch)
throws IOException {
assertThat(stream.read())
.describedAs("read() in %s", stream)
.isEqualTo(ch);
}
private static void assertAvailableValue(final InputStream stream,
final int expected) throws IOException {
assertThat(stream.available())
.describedAs("wrong available() in %s", stream)
.isEqualTo(expected);
}
private static void assertStreamCreationCount(
final UploadContentProviders.BaseContentProvider<?> cp,
final int count) {
assertThat(cp.getStreamCreationCount())
.describedAs("stream creation count of %s", cp)
.isEqualTo(count);
}
/**
* Assert the number of buffers active for a block factory,
* if the factory is a ByteBufferBlockFactory.
* <p>
* If it is of any other type, no checks are made.
* @param factory factory
* @param expectedCount expected count.
*/
private static void maybeAssertOutstandingBuffers(
S3ADataBlocks.BlockFactory factory,
int expectedCount) {
if (factory instanceof S3ADataBlocks.ByteBufferBlockFactory) {
S3ADataBlocks.ByteBufferBlockFactory bufferFactory =
(S3ADataBlocks.ByteBufferBlockFactory) factory;
assertThat(bufferFactory.getOutstandingBufferCount())
.describedAs("outstanding buffers in %s", factory)
.isEqualTo(expectedCount);
}
}
}