TestChainTOSInputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.object.tos;
import com.volcengine.tos.model.object.GetObjectBasicOutput;
import com.volcengine.tos.model.object.GetObjectV2Output;
import org.apache.hadoop.fs.tosfs.common.Bytes;
import org.apache.hadoop.fs.tosfs.object.Constants;
import org.apache.hadoop.fs.tosfs.util.TestUtility;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestChainTOSInputStream {
private static final int DATA_SIZE = 1 << 20;
private static final byte[] DATA = TestUtility.rand(DATA_SIZE);
@Test
public void testRetryReadData() throws IOException {
int readLen = DATA_SIZE - 1;
int cutOff = readLen / 2;
try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
cutOff)) {
// The read length is more than the cut-off position, and equal to data length,
// so the first stream will throw IOException, and fallback to the second stream.
byte[] data = new byte[readLen];
int n = stream.read(data);
assertEquals(readLen, n);
assertArrayEquals(Bytes.toBytes(DATA, 0, readLen), data);
}
try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
cutOff)) {
// The read length is more than data length, so the first stream will throw IOException,
// and fallback to the second stream.
byte[] data = new byte[readLen + 2];
int n = stream.read(data);
assertEquals(readLen, n);
assertArrayEquals(Bytes.toBytes(DATA, 0, readLen), Bytes.toBytes(data, 0, n));
}
readLen = DATA_SIZE / 3;
cutOff = DATA_SIZE / 2;
try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE, 1024,
cutOff)) {
for (int i = 0; i <= 3; i++) {
// The cut-off position is between (readLen, 2 * readLen), so the data of first read come
// from the first stream, and then the second read will meet IOException, and fallback to
// the second stream.
byte[] data = new byte[readLen];
int n = stream.read(data);
int off = i * readLen;
int len = Math.min(readLen, DATA_SIZE - off);
assertEquals(len, n);
assertArrayEquals(Bytes.toBytes(DATA, off, len), Bytes.toBytes(data, 0, len));
}
}
int smallDataSize = 1 << 10;
cutOff = smallDataSize / 2;
byte[] smallData = TestUtility.rand(1 << 10);
try (ChainTOSInputStream stream = createTestChainTOSInputStream(smallData, 0, smallDataSize,
1024, cutOff)) {
for (int i = 0; i < smallDataSize; i++) {
// The cut-off position is 512, the 512th read operation will meet IOException,
// and then fallback to the second stream.
int read = stream.read();
assertEquals(smallData[i] & 0xFF, read);
}
}
}
@Test
public void testSkipAndRead() throws IOException {
int cutOff = (DATA_SIZE - 1) / 2;
try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
cutOff)) {
// The skip pos is equal to cut-off pos, once skip finished, the first read operation will
// meet IOException, and the fallback to the second stream.
int readPos = (DATA_SIZE - 1) / 2;
stream.skip(readPos);
int readLen = 1024;
byte[] data = new byte[readLen];
int n = stream.read(data);
assertEquals(readLen, n);
assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
}
try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
cutOff)) {
// The skip pos is more than cut-off pos, the skip operation will throw IOException,
// and the fallback to the second stream and skip(readPos) again
int readPos = cutOff + 1024;
stream.skip(readPos);
int readLen = 1024;
byte[] data = new byte[readLen];
int n = stream.read(data);
assertEquals(readLen, n);
assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
}
try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
cutOff)) {
// The skip pos = cut-off pos - 1025, the skip operation will succeed on the first stream,
// the 1024 bytes read operation also succeed on the first stream,
// but the next 1024 bytes read operation will fail on the first stream, and fallback to the
// second stream
int readPos = cutOff - 1024 - 1;
stream.skip(readPos);
int readLen = 1024;
byte[] data = new byte[readLen];
int n = stream.read(data);
assertEquals(readLen, n);
assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
n = stream.read(data);
assertEquals(readLen, n);
assertArrayEquals(Bytes.toBytes(DATA, readPos + 1024, readLen), data);
}
try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
cutOff)) {
// 1. Skip 1024 bytes and then read 1024 bytes from the first stream.
// 2. And then skip cut-off - 512 bytes, the target off = 1024 + 1024 + cut-off - 512,
// which is bigger than cut-off pos, so the second skip operation will fail,
// and then fallback to the second stream.
// 3. Read 1024 bytes
int readPos = 1024;
stream.skip(readPos);
int readLen = 1024;
byte[] data = new byte[readLen];
int n = stream.read(data);
assertEquals(readLen, n);
assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
int skipPos = cutOff - 512;
stream.skip(skipPos);
n = stream.read(data);
assertEquals(readLen, n);
int targetOff = readPos + 1024 + skipPos;
assertArrayEquals(Bytes.toBytes(DATA, targetOff, readLen), data);
}
}
/**
* The ChainTOSInputStream contains two stream created by TestObjectFactory.
* Once the read pos of first stream is more than cutPos, the stream will throw IOException with
* unexpect end of stream error msg, but the second stream will contain the remaining data.
*/
private ChainTOSInputStream createTestChainTOSInputStream(byte[] data, long startOff, long endOff,
long maxDrainSize, long cutPos) {
String key = "dummy-key";
TOS.GetObjectFactory factory = new TestObjectFactory(data, Arrays.asList(cutPos, -1L));
return new ChainTOSInputStream(factory, key, startOff, endOff, maxDrainSize, 1);
}
private static class TestObjectFactory implements TOS.GetObjectFactory {
private final byte[] data;
private final List<Long> streamBreakPoses;
private int streamIndex = 0;
TestObjectFactory(byte[] data, List<Long> streamBreakPoses) {
this.data = data;
this.streamBreakPoses = streamBreakPoses;
}
@Override
public GetObjectOutput create(String key, long offset, long end) {
long len = Math.min(end, data.length) - offset;
ByteArrayInputStream dataIn = new ByteArrayInputStream(this.data, (int) offset, (int) len);
if (streamIndex < streamBreakPoses.size()) {
return new GetObjectOutput(new GetObjectV2Output(new GetObjectBasicOutput(),
new UnExpectedEndOfStream(dataIn, streamBreakPoses.get(streamIndex++))),
Constants.MAGIC_CHECKSUM);
} else {
throw new RuntimeException("No more output");
}
}
}
private static class UnExpectedEndOfStream extends InputStream {
private final ByteArrayInputStream delegate;
private final long breakPos;
private int readPos;
UnExpectedEndOfStream(ByteArrayInputStream stream, long breakPos) {
delegate = stream;
this.breakPos = breakPos;
}
@Override
public int read() throws IOException {
if (breakPos != -1 && readPos >= breakPos) {
throw new IOException("unexpected end of stream on dummy source.");
} else {
int n = delegate.read();
readPos += 1;
return n;
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (breakPos != -1 && readPos >= breakPos) {
throw new IOException("unexpected end of stream on dummy source.");
} else {
int n = delegate.read(b, off, len);
readPos += n;
return n;
}
}
}
}