BinaryConsumerTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.jdbc.consumer;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.junit.jupiter.api.Test;
public class BinaryConsumerTest extends AbstractConsumerTest {
private static final int INITIAL_VALUE_ALLOCATION = BaseValueVector.INITIAL_VALUE_ALLOCATION;
private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
interface InputStreamConsumer {
void consume(BinaryConsumer consumer) throws IOException;
}
protected void assertConsume(boolean nullable, InputStreamConsumer dataConsumer, byte[][] expect)
throws IOException {
try (final VarBinaryVector vector = new VarBinaryVector("binary", allocator)) {
BinaryConsumer consumer = BinaryConsumer.createConsumer(vector, 0, nullable);
dataConsumer.consume(consumer);
assertEquals(expect.length - 1, vector.getLastSet());
for (int i = 0; i < expect.length; i++) {
byte[] value = expect[i];
if (value == null) {
assertTrue(vector.isNull(i));
} else {
assertArrayEquals(expect[i], vector.get(i));
}
}
}
}
private byte[] createBytes(int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
bytes[i] = (byte) (i % 1024);
}
return bytes;
}
public void testConsumeInputStream(byte[][] values, boolean nullable) throws IOException {
assertConsume(
nullable,
binaryConsumer -> {
for (byte[] value : values) {
if (value != null) {
binaryConsumer.consume(new ByteArrayInputStream(value));
} else {
binaryConsumer.consume((InputStream) null);
}
binaryConsumer.moveWriterPosition();
}
},
values);
}
@Test
public void testConsumeInputStream() throws IOException {
testConsumeInputStream(new byte[][] {createBytes(DEFAULT_RECORD_BYTE_COUNT)}, false);
testConsumeInputStream(
new byte[][] {
createBytes(DEFAULT_RECORD_BYTE_COUNT), createBytes(DEFAULT_RECORD_BYTE_COUNT)
},
false);
testConsumeInputStream(
new byte[][] {
createBytes(DEFAULT_RECORD_BYTE_COUNT * 2),
createBytes(DEFAULT_RECORD_BYTE_COUNT),
createBytes(DEFAULT_RECORD_BYTE_COUNT)
},
false);
testConsumeInputStream(
new byte[][] {createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT)}, false);
testConsumeInputStream(
new byte[][] {
createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT * 10),
},
false);
testConsumeInputStream(
new byte[][] {
createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT),
createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT)
},
false);
testConsumeInputStream(
new byte[][] {
createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT),
createBytes(DEFAULT_RECORD_BYTE_COUNT),
createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT)
},
false);
byte[][] testRecords = new byte[INITIAL_VALUE_ALLOCATION * 2][];
for (int i = 0; i < testRecords.length; i++) {
testRecords[i] = createBytes(DEFAULT_RECORD_BYTE_COUNT);
}
testConsumeInputStream(testRecords, false);
byte[] bytes1 = new byte[] {1, 2, 3};
byte[] bytes2 = new byte[] {4, 5, 6};
testConsumeInputStream(new byte[][] {bytes1, null, bytes2}, true);
}
}