TestMagicOutputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.common.ThreadPools;
import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageTestBase;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.object.Part;
import org.apache.hadoop.fs.tosfs.object.staging.StagingPart;
import org.apache.hadoop.fs.tosfs.object.staging.State;
import org.apache.hadoop.fs.tosfs.util.TestUtility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutorService;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class TestMagicOutputStream extends ObjectStorageTestBase {
private static ExecutorService threadPool;
@BeforeAll
public static void beforeClass() {
threadPool = ThreadPools.newWorkerPool("TestMagicOutputStream-pool");
}
@AfterAll
public static void afterClass() {
if (!threadPool.isShutdown()) {
threadPool.shutdown();
}
}
private static Path path(String p) {
return new Path(p);
}
private static Path path(Path parent, String child) {
return new Path(parent, child);
}
@Test
public void testCreateDestKey() {
Object[][] testCases = new Object[][]{
new Object[]{path("tos://bucket/__magic/a.txt"), "a.txt"},
new Object[] {path("tos://bucket/output/__magic/job-1/tasks/tasks-attempt-0/a.txt"),
"output/a.txt"},
new Object[]{path("tos://bucket/__magic/job0/task0/__base/a.txt"), "a.txt"},
new Object[] {path("tos://bucket/output/__magic/job0/task0/__base/part/part-m-1000"),
"output/part/part-m-1000"},
new Object[]{path("tos://bucket/a/b/c/__magic/__base/d/e/f"), "a/b/c/d/e/f"},
new Object[]{path("tos://bucket/a/b/c/__magic/d/e/f"), "a/b/c/f"},
};
for (Object[] input : testCases) {
String actualDestKey = MagicOutputStream.toDestKey((Path) input[0]);
assertEquals(actualDestKey, input[1], "Unexpected destination key.");
}
}
@Test
public void testNonMagicPath() {
try (MagicOutputStream ignored = new TestingMagicOutputStream(path(testDir(), "non-magic"))) {
fail("Cannot create magic output stream for non-magic path");
} catch (Exception ignored) {
}
}
@Test
public void testWriteZeroByte() throws IOException {
Path magic = path(path(testDir(), CommitUtils.MAGIC), "zero-byte.txt");
MagicOutputStream out = new TestingMagicOutputStream(magic);
// write zero-byte and close.
out.close();
assertStagingFiles(0, out.stagingParts());
// Read and validate the .pending contents
try (InputStream in = getStorage().get(out.pendingKey()).stream()) {
byte[] data = IOUtils.toByteArray(in);
Pending commit = Pending.deserialize(data);
assertEquals(getStorage().bucket().name(), commit.bucket());
assertEquals(out.destKey(), commit.destKey());
assertTrue(StringUtils.isNoneEmpty(commit.uploadId()));
assertTrue(commit.createdTimestamp() > 0);
assertEquals(1, commit.parts().size());
assertEquals(0, commit.length());
assertEquals(out.upload().uploadId(), commit.uploadId());
}
}
public void testWrite(int len) throws IOException {
Path magic = path(path(testDir(), CommitUtils.MAGIC), len + ".txt");
int uploadPartSize = 8 << 20;
int partNum = (len - 1) / (8 << 20) + 1;
MagicOutputStream out = new TestingMagicOutputStream(magic);
byte[] data = TestUtility.rand(len);
out.write(data);
out.close();
assertStagingFiles(partNum, out.stagingParts());
assertEquals(ObjectUtils.pathToKey(magic) + CommitUtils.PENDING_SUFFIX, out.pendingKey());
Pending commit;
try (InputStream in = getStorage().get(out.pendingKey()).stream()) {
byte[] serializedData = IOUtils.toByteArray(in);
commit = Pending.deserialize(serializedData);
assertEquals(getStorage().bucket().name(), commit.bucket());
assertEquals(out.destKey(), commit.destKey());
assertTrue(commit.createdTimestamp() > 0);
assertEquals(len, commit.length());
assertEquals(out.upload().uploadId(), commit.uploadId());
// Verify the upload part list.
assertEquals(partNum, commit.parts().size());
if (!commit.parts().isEmpty()) {
for (int i = 0; i < partNum - 1; i += 1) {
assertEquals(uploadPartSize, commit.parts().get(i).size());
}
Part lastPart = commit.parts().get(partNum - 1);
assertTrue(lastPart.size() > 0 && lastPart.size() <= uploadPartSize);
}
}
// List multipart uploads
int uploadsNum = 0;
for (MultipartUpload upload : getStorage().listUploads(out.destKey())) {
uploadsNum += 1;
assertEquals(out.upload(), upload);
}
assertEquals(1L, uploadsNum);
// The target object is still not visible for object storage.
assertNull(getStorage().head(out.destKey()));
// Complete the upload and validate the content.
getStorage().completeUpload(out.destKey(), out.upload().uploadId(), commit.parts());
try (InputStream in = getStorage().get(out.destKey()).stream()) {
assertArrayEquals(data, IOUtils.toByteArray(in));
}
}
@Test
public void testWrite1MB() throws IOException {
testWrite(1 << 20);
}
@Test
public void testWrite24MB() throws IOException {
testWrite(24 << 20);
}
@Test
public void testWrite100MB() throws IOException {
testWrite(100 << 20);
}
private static void assertStagingFiles(int expectedNum, List<StagingPart> stagings) {
assertEquals(expectedNum, stagings.size());
for (StagingPart staging : stagings) {
assertEquals(State.CLEANED, staging.state());
}
}
private class TestingMagicOutputStream extends MagicOutputStream {
TestingMagicOutputStream(Path magic) {
super(fs(), getStorage(), threadPool, tosConf(), magic);
}
protected void persist(Path p, byte[] data) {
getStorage().put(ObjectUtils.pathToKey(p), data);
}
}
}