TestObjectOutputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.object;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.common.ThreadPools;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.object.staging.StagingPart;
import org.apache.hadoop.fs.tosfs.object.staging.State;
import org.apache.hadoop.fs.tosfs.util.FSUtils;
import org.apache.hadoop.fs.tosfs.util.TempFiles;
import org.apache.hadoop.fs.tosfs.util.TestUtility;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Lists;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestObjectOutputStream extends ObjectStorageTestBase {
private static ExecutorService threadPool;
@BeforeAll
public static void beforeClass() {
threadPool = ThreadPools.newWorkerPool("TestObjectOutputStream-pool");
}
@AfterAll
public static void afterClass() {
if (!threadPool.isShutdown()) {
threadPool.shutdown();
}
}
@Test
public void testMkStagingDir() throws ExecutionException, InterruptedException, IOException {
try (TempFiles tmp = TempFiles.of()) {
List<String> tmpDirs = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
tmpDirs.add(tmp.newDir());
}
Configuration newConf = new Configuration(tosConf());
newConf.set(ConfKeys.FS_MULTIPART_STAGING_DIR.key("filestore"), Joiner.on(",").join(tmpDirs));
// Start multiple threads to open streams to create staging dir.
List<Future<ObjectOutputStream>> futures = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 10; i++) {
futures.add(threadPool.submit(() ->
new ObjectOutputStream(getStorage(), threadPool, newConf, path("none.txt"), true)));
}
for (Future<ObjectOutputStream> f : futures) {
f.get().close();
}
}
}
@Test
public void testWriteZeroByte() throws IOException {
Path zeroByteTxt = path("zero-byte.txt");
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, tosConf(), zeroByteTxt, true);
// write zero-byte and close.
out.write(new byte[0], 0, 0);
out.close();
assertStagingPart(0, out.stagingParts());
// Read and validate the dest object contents
ObjectTestUtils.assertObject(zeroByteTxt, ObjectTestUtils.EMPTY_BYTES);
}
@Test
public void testWriteZeroByteWithoutAllowPut() throws IOException {
Path zeroByteTxt = path("zero-byte-without-allow-put.txt");
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, tosConf(), zeroByteTxt, false);
// write zero-byte and close.
out.close();
assertStagingPart(0, out.stagingParts());
// Read and validate the dest object content.
ObjectTestUtils.assertObject(zeroByteTxt, ObjectTestUtils.EMPTY_BYTES);
}
@Test
public void testDeleteStagingFileWhenUploadPartsOK() throws IOException {
Path path = path("data.txt");
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, tosConf(), path, true);
byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 2));
out.write(data);
out.waitForPartsUpload();
for (StagingPart part : out.stagingParts()) {
assertEquals(State.CLEANED, part.state());
}
out.close();
for (StagingPart part : out.stagingParts()) {
assertEquals(State.CLEANED, part.state());
}
}
@Test
public void testDeleteStagingFileWithClose() throws IOException {
Path path = path("data.txt");
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, tosConf(), path, true);
byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 2));
out.write(data);
out.close();
for (StagingPart part : out.stagingParts()) {
assertEquals(State.CLEANED, part.state());
}
}
@Test
public void testDeleteSimplePutStagingFile() throws IOException {
Path smallTxt = path("small.txt");
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, tosConf(), smallTxt, true);
byte[] data = TestUtility.rand(4 << 20);
out.write(data);
for (StagingPart part : out.stagingParts()) {
assertTrue(part.size() > 0);
}
out.close();
for (StagingPart part : out.stagingParts()) {
assertEquals(State.CLEANED, part.state());
}
}
@Test
public void testSimplePut() throws IOException {
Path smallTxt = path("small.txt");
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, tosConf(), smallTxt, true);
byte[] data = TestUtility.rand(4 << 20);
out.write(data);
out.close();
assertStagingPart(1, out.stagingParts());
assertNull(out.upload(), "Should use the simple PUT to upload object for small file.");
// Read and validate the dest object content.
ObjectTestUtils.assertObject(smallTxt, data);
}
public void testWrite(int uploadPartSize, int len) throws IOException {
Configuration newConf = new Configuration(tosConf());
newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf(), testDir().toUri())),
uploadPartSize);
Path outPath = path(len + ".txt");
int partNum = (len - 1) / uploadPartSize + 1;
byte[] data = TestUtility.rand(len);
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, newConf, outPath, true);
try {
out.write(data);
} finally {
out.close();
}
assertStagingPart(partNum, out.stagingParts());
ObjectTestUtils.assertObject(outPath, data);
// List multipart uploads
int uploadsNum = 0;
for (MultipartUpload ignored : getStorage().listUploads(out.destKey())) {
uploadsNum += 1;
}
assertEquals(0L, uploadsNum);
}
@Test
public void testParallelWriteOneOutPutStream() throws IOException, ExecutionException,
InterruptedException {
testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 128);
testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 1 << 20);
testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 2 << 20);
testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 6 << 20);
}
public void testParallelWriteOneOutPutStreamImpl(int partSize, int epochs, int batchSize)
throws IOException, ExecutionException, InterruptedException {
Configuration newConf = new Configuration(tosConf());
newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf(), testDir().toUri())),
partSize);
String file =
String.format("%d-%d-%d-testParallelWriteOneOutPutStream.txt", partSize, epochs, batchSize);
Path outPath = path(file);
try (ObjectOutputStream out = new ObjectOutputStream(getStorage(), threadPool, newConf, outPath,
true)) {
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < epochs; i++) {
final int index = i;
futures.add(threadPool.submit(() -> {
try {
out.write(dataset(batchSize, index));
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
// wait for all tasks finished
for (Future<?> future : futures) {
future.get();
}
}
try (InputStream inputStream = getStorage().get(ObjectUtils.pathToKey(outPath)).stream()) {
List<byte[]> ret = new ArrayList<>();
byte[] data = new byte[batchSize];
while (inputStream.read(data) != -1) {
ret.add(data);
data = new byte[batchSize];
}
assertEquals(epochs, ret.size());
List<byte[]> sortedRet = ret.stream()
.sorted(Comparator.comparingInt(o -> o[0]))
.collect(Collectors.toList());
int j = 0;
for (byte[] e : sortedRet) {
assertArrayEquals(dataset(batchSize, j), e);
j++;
}
}
}
public static byte[] dataset(int len, int base) {
byte[] dataset = new byte[len];
for (int i = 0; i < len; i++) {
dataset[i] = (byte) (base);
}
return dataset;
}
@Test
public void testWrite1MB() throws IOException {
testWrite(5 << 20, 1 << 20);
testWrite(8 << 20, 1 << 20);
testWrite(16 << 20, 1 << 20);
}
@Test
public void testWrite24MB() throws IOException {
testWrite(5 << 20, 24 << 20);
testWrite(8 << 20, 24 << 20);
testWrite(16 << 20, 24 << 20);
}
@Test
public void testWrite100MB() throws IOException {
testWrite(5 << 20, 100 << 20);
testWrite(8 << 20, 100 << 20);
testWrite(16 << 20, 100 << 20);
}
private void testMultipartThreshold(int partSize, int multipartThreshold, int dataSize)
throws IOException {
Configuration newConf = new Configuration(tosConf());
newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(scheme()), partSize);
newConf.setLong(ConfKeys.FS_MULTIPART_THRESHOLD.key(scheme()), multipartThreshold);
Path outPath =
path(String.format("threshold-%d-%d-%d.txt", partSize, multipartThreshold, dataSize));
byte[] data = TestUtility.rand(dataSize);
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, newConf, outPath, true);
try {
// Verify for every 1MB data writing, unless reaching the threshold.
int upperLimit = Math.min(multipartThreshold, dataSize);
int curOff = 0;
for (; curOff < upperLimit; curOff += (1 << 20)) {
int end = Math.min(curOff + (1 << 20), upperLimit);
out.write(Arrays.copyOfRange(data, curOff, end));
List<MultipartUpload> uploads = Lists.newArrayList(getStorage().listUploads(out.destKey()));
if (end < multipartThreshold) {
assertEquals(0, uploads.size(),
"Shouldn't has any uploads because it just use simple PUT");
} else {
assertEquals(1, uploads.size(), "Switch to use MPU.");
}
assertEquals((end - 1) / partSize + 1, out.stagingParts().size());
}
// Verify for every 1MB data writing, unless reaching the data size.
for (; curOff < dataSize; curOff += (1 << 20)) {
int end = Math.min(curOff + (1 << 20), dataSize);
out.write(Arrays.copyOfRange(data, curOff, end));
List<MultipartUpload> uploads = Lists.newArrayList(getStorage().listUploads(out.destKey()));
assertEquals(1, uploads.size());
assertEquals(out.destKey(), uploads.get(0).key());
assertEquals((end - 1) / partSize + 1, out.stagingParts().size());
}
} finally {
out.close();
}
assertStagingPart((dataSize - 1) / partSize + 1, out.stagingParts());
ObjectTestUtils.assertObject(outPath, data);
List<MultipartUpload> uploads = Lists.newArrayList(getStorage().listUploads(out.destKey()));
assertEquals(0, uploads.size());
}
@Test
public void testMultipartThreshold2MB() throws IOException {
testMultipartThreshold(5 << 20, 2 << 20, 1 << 20);
testMultipartThreshold(5 << 20, 2 << 20, (2 << 20) - 1);
testMultipartThreshold(5 << 20, 2 << 20, 2 << 20);
testMultipartThreshold(5 << 20, 2 << 20, 4 << 20);
testMultipartThreshold(5 << 20, 2 << 20, 5 << 20);
testMultipartThreshold(5 << 20, 2 << 20, (5 << 20) + 1);
testMultipartThreshold(5 << 20, 2 << 20, 6 << 20);
testMultipartThreshold(5 << 20, 2 << 20, 10 << 20);
testMultipartThreshold(5 << 20, 2 << 20, 20 << 20);
}
@Test
public void testMultipartThreshold5MB() throws IOException {
testMultipartThreshold(5 << 20, 5 << 20, 1 << 20);
testMultipartThreshold(5 << 20, 5 << 20, 4 << 20);
testMultipartThreshold(5 << 20, 5 << 20, 5 << 20);
testMultipartThreshold(5 << 20, 5 << 20, 5 << 20);
testMultipartThreshold(5 << 20, 5 << 20, 6 << 20);
testMultipartThreshold(5 << 20, 5 << 20, 10 << 20);
testMultipartThreshold(5 << 20, 5 << 20, 20 << 20);
}
@Test
public void testMultipartThreshold10MB() throws IOException {
testMultipartThreshold(5 << 20, 10 << 20, 1 << 20);
testMultipartThreshold(5 << 20, 10 << 20, 10 << 20);
testMultipartThreshold(5 << 20, 10 << 20, 11 << 20);
testMultipartThreshold(5 << 20, 10 << 20, 15 << 20);
testMultipartThreshold(5 << 20, 10 << 20, 20 << 20);
testMultipartThreshold(5 << 20, 10 << 20, 40 << 20);
testMultipartThreshold(5 << 20, 10 << 20, 30 << 20);
}
@Test
public void testCloseStreamTwice() throws IOException {
int len = 100;
Path outPath = path(len + ".txt");
int partNum = 1;
byte[] data = TestUtility.rand(len);
ObjectOutputStream out =
new ObjectOutputStream(getStorage(), threadPool, tosConf(), outPath, true);
try {
out.write(data);
out.close();
} finally {
out.close();
}
assertStagingPart(partNum, out.stagingParts());
ObjectTestUtils.assertObject(outPath, data);
}
@Test
public void testWriteClosedStream() throws IOException {
byte[] data = TestUtility.rand(10);
Path outPath = path("testWriteClosedStream.txt");
try (ObjectOutputStream out = new ObjectOutputStream(getStorage(), threadPool, tosConf(),
outPath, true)) {
out.close();
out.write(data);
} catch (IllegalStateException e) {
assertEquals("OutputStream is closed.", e.getMessage());
}
}
private static void assertStagingPart(int expectedNum, List<StagingPart> parts) {
assertEquals(expectedNum, parts.size());
for (StagingPart part : parts) {
assertTrue(part.size() > 0);
}
}
private Path path(String name) {
return new Path(testDir(), name);
}
}