TestAvroDecoder.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.decoder.avro;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarbinaryType;
import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.decoder.DecoderTestColumnHandle;
import com.facebook.presto.decoder.FieldValueProvider;
import com.facebook.presto.decoder.RowDecoder;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slices;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.assertj.core.api.ThrowableAssert;
import org.testng.annotations.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.common.type.VarcharType.createVarcharType;
import static com.facebook.presto.decoder.util.DecoderTestUtil.checkIsNull;
import static com.facebook.presto.decoder.util.DecoderTestUtil.checkValue;
import static com.facebook.presto.testing.TestingEnvironment.FUNCTION_AND_TYPE_MANAGER;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class TestAvroDecoder
{
private static final String DATA_SCHEMA = "dataSchema";
private static final AvroRowDecoderFactory DECODER_FACTORY = new AvroRowDecoderFactory();
private static final Type VACHAR_MAP_TYPE = FUNCTION_AND_TYPE_MANAGER.getType(parseTypeSignature("map<varchar,varchar>"));
private static final Type DOUBLE_MAP_TYPE = FUNCTION_AND_TYPE_MANAGER.getType(parseTypeSignature("map<varchar,double>"));
private static Schema getAvroSchema(Map<String, String> fields)
{
Schema.Parser parser = new Schema.Parser();
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = getFieldBuilder();
for (Map.Entry<String, String> field : fields.entrySet()) {
SchemaBuilder.FieldBuilder<Schema> fieldBuilder = fieldAssembler.name(field.getKey());
Schema fieldSchema = parser.parse(field.getValue());
SchemaBuilder.GenericDefault<Schema> genericDefault = fieldBuilder.type(fieldSchema);
switch (fieldSchema.getType()) {
case ARRAY:
genericDefault.withDefault(ImmutableList.of());
break;
case MAP:
genericDefault.withDefault(ImmutableMap.of());
break;
case UNION:
if (fieldSchema.getTypes().stream()
.map(Schema::getType)
.anyMatch(Schema.Type.NULL::equals)) {
genericDefault.withDefault(null);
}
else {
genericDefault.noDefault();
}
break;
case NULL:
genericDefault.withDefault(null);
break;
default:
genericDefault.noDefault();
}
}
return fieldAssembler.endRecord();
}
private Map<DecoderColumnHandle, FieldValueProvider> buildAndDecodeColumns(Set<DecoderColumnHandle> columns, Map<String, String> fieldSchema, Map<String, Object> fieldValue)
{
Schema schema = getAvroSchema(fieldSchema);
byte[] avroData = buildAvroData(schema, fieldValue);
return decodeRow(
avroData,
columns,
ImmutableMap.of(DATA_SCHEMA, schema.toString()));
}
private Map<DecoderColumnHandle, FieldValueProvider> buildAndDecodeColumn(DecoderTestColumnHandle column, String columnName, String columnType, Object actualValue)
{
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumns(
ImmutableSet.of(column),
ImmutableMap.of(columnName, columnType),
ImmutableMap.of(columnName, actualValue));
assertEquals(decodedRow.size(), 1);
return decodedRow;
}
private static Map<DecoderColumnHandle, FieldValueProvider> decodeRow(byte[] avroData, Set<DecoderColumnHandle> columns, Map<String, String> dataParams)
{
RowDecoder rowDecoder = DECODER_FACTORY.create(dataParams, columns);
return rowDecoder.decodeRow(avroData, null)
.orElseThrow(AssertionError::new);
}
private static byte[] buildAvroData(Schema schema, String name, Object value)
{
return buildAvroData(schema, ImmutableMap.of(name, value));
}
private static byte[] buildAvroData(Schema schema, Map<String, Object> values)
{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
buildAvroRecord(schema, outputStream, values);
return outputStream.toByteArray();
}
private static GenericData.Record buildAvroRecord(Schema schema, ByteArrayOutputStream outputStream, Map<String, Object> values)
{
GenericData.Record record = new GenericData.Record(schema);
values.forEach(record::put);
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
dataFileWriter.create(schema, outputStream);
dataFileWriter.append(record);
}
catch (IOException e) {
throw new RuntimeException("Failed to convert to Avro.", e);
}
return record;
}
@Test
public void testStringDecodedAsVarchar()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", VARCHAR, "string_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "string_field", "\"string\"", "Mon Jul 28 20:38:07 +0000 2014");
checkValue(decodedRow, row, "Mon Jul 28 20:38:07 +0000 2014");
}
@Test
public void testSchemaEvolutionAddingColumn()
throws Exception
{
DecoderTestColumnHandle originalColumn = new DecoderTestColumnHandle(0, "row0", VARCHAR, "string_field", null, null, false, false, false);
DecoderTestColumnHandle newlyAddedColumn = new DecoderTestColumnHandle(1, "row1", VARCHAR, "string_field_added", null, null, false, false, false);
// the decoded avro data file does not have string_field_added
byte[] originalData = buildAvroData(getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.endRecord(),
"string_field", "string_field_value");
String addedColumnSchema = getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.name("string_field_added").type().optional().stringType()
.endRecord().toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = decodeRow(
originalData,
ImmutableSet.of(originalColumn, newlyAddedColumn),
ImmutableMap.of(DATA_SCHEMA, addedColumnSchema));
assertEquals(decodedRow.size(), 2);
checkValue(decodedRow, originalColumn, "string_field_value");
checkIsNull(decodedRow, newlyAddedColumn);
}
@Test
public void testEnumDecodedAsVarchar()
{
Schema schema = SchemaBuilder.record("record")
.fields()
.name("enum_field")
.type()
.enumeration("Weekday")
.symbols("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday")
.noDefault()
.endRecord();
Schema enumType = schema.getField("enum_field").schema();
GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(enumType, "Wednesday");
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", VARCHAR, "enum_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "enum_field", enumType.toString(), enumValue);
checkValue(decodedRow, row, "Wednesday");
}
@Test
public void testSchemaEvolutionRenamingColumn()
throws Exception
{
byte[] originalData = buildAvroData(getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.endRecord(),
"string_field", "string_field_value");
DecoderTestColumnHandle renamedColumn = new DecoderTestColumnHandle(0, "row0", VARCHAR, "string_field_renamed", null, null, false, false, false);
String renamedColumnSchema = getFieldBuilder()
.name("string_field_renamed").type().optional().stringType()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalData,
ImmutableSet.of(renamedColumn),
ImmutableMap.of(DATA_SCHEMA, renamedColumnSchema));
assertEquals(decodedEvolvedRow.size(), 1);
checkIsNull(decodedEvolvedRow, renamedColumn);
}
@Test
public void testSchemaEvolutionRemovingColumn()
throws Exception
{
byte[] originalData = buildAvroData(getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.name("string_field_to_be_removed").type().optional().stringType()
.endRecord(),
ImmutableMap.of(
"string_field", "string_field_value",
"string_field_to_be_removed", "removed_field_value"));
DecoderTestColumnHandle evolvedColumn = new DecoderTestColumnHandle(0, "row0", VARCHAR, "string_field", null, null, false, false, false);
String removedColumnSchema = getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalData,
ImmutableSet.of(evolvedColumn),
ImmutableMap.of(DATA_SCHEMA, removedColumnSchema));
assertEquals(decodedEvolvedRow.size(), 1);
checkValue(decodedEvolvedRow, evolvedColumn, "string_field_value");
}
@Test
public void testSchemaEvolutionIntToLong()
throws Exception
{
byte[] originalIntData = buildAvroData(getFieldBuilder()
.name("int_to_long_field").type().intType().noDefault()
.endRecord(),
"int_to_long_field", 100);
DecoderTestColumnHandle longColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", BIGINT, "int_to_long_field", null, null, false, false, false);
String changedTypeSchema = getFieldBuilder()
.name("int_to_long_field").type().longType().noDefault()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalIntData,
ImmutableSet.of(longColumnReadingIntData),
ImmutableMap.of(DATA_SCHEMA, changedTypeSchema));
assertEquals(decodedEvolvedRow.size(), 1);
checkValue(decodedEvolvedRow, longColumnReadingIntData, 100);
}
@Test
public void testSchemaEvolutionIntToDouble()
throws Exception
{
byte[] originalIntData = buildAvroData(getFieldBuilder()
.name("int_to_double_field").type().intType().noDefault()
.endRecord(),
"int_to_double_field", 100);
DecoderTestColumnHandle doubleColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", DOUBLE, "int_to_double_field", null, null, false, false, false);
String changedTypeSchema = getFieldBuilder()
.name("int_to_double_field").type().doubleType().noDefault()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalIntData,
ImmutableSet.of(doubleColumnReadingIntData),
ImmutableMap.of(DATA_SCHEMA, changedTypeSchema));
assertEquals(decodedEvolvedRow.size(), 1);
checkValue(decodedEvolvedRow, doubleColumnReadingIntData, 100.0);
}
@Test
public void testSchemaEvolutionToIncompatibleType()
throws Exception
{
byte[] originalIntData = buildAvroData(getFieldBuilder()
.name("int_to_string_field").type().intType().noDefault()
.endRecord(),
"int_to_string_field", 100);
DecoderTestColumnHandle stringColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", VARCHAR, "int_to_string_field", null, null, false, false, false);
String changedTypeSchema = getFieldBuilder()
.name("int_to_string_field").type().stringType().noDefault()
.endRecord()
.toString();
assertThatThrownBy(() -> decodeRow(originalIntData, ImmutableSet.of(stringColumnReadingIntData), ImmutableMap.of(DATA_SCHEMA, changedTypeSchema)))
.isInstanceOf(PrestoException.class)
.hasCauseExactlyInstanceOf(AvroTypeException.class)
.hasStackTraceContaining("Found int, expecting string")
.hasMessageMatching("Decoding Avro record failed.");
}
@Test
public void testLongDecodedAsBigint()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", BIGINT, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "\"long\"", 493857959588286460L);
checkValue(decodedRow, row, 493857959588286460L);
}
@Test
public void testIntDecodedAsBigint()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", BIGINT, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "\"int\"", 100);
checkValue(decodedRow, row, 100);
}
@Test
public void testIntDecodedAsInteger()
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", INTEGER, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "\"int\"", 100_000);
checkValue(decodedRow, row, 100_000);
}
@Test
public void testIntDecodedAsSmallInt()
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", SMALLINT, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "\"int\"", 1000);
checkValue(decodedRow, row, 1000);
}
@Test
public void testIntDecodedAsTinyInt()
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", TINYINT, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "\"int\"", 100);
checkValue(decodedRow, row, 100);
}
@Test
public void testFloatDecodedAsDouble()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", DOUBLE, "float_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "float_field", "\"float\"", 10.2f);
checkValue(decodedRow, row, 10.2);
}
@Test
public void testFloatDecodedAsReal()
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", REAL, "float_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "float_field", "\"float\"", 10.2f);
checkValue(decodedRow, row, 10.2);
}
@Test
public void testBytesDecodedAsVarbinary()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", VARBINARY, "encoded", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "encoded", "\"bytes\"", ByteBuffer.wrap("mytext".getBytes(UTF_8)));
checkValue(decodedRow, row, "mytext");
}
@Test
public void testFixedDecodedAsVarbinary()
{
Schema schema = SchemaBuilder.record("record")
.fields().name("fixed_field")
.type()
.fixed("fixed5")
.size(5)
.noDefault()
.endRecord();
Schema fixedType = schema.getField("fixed_field").schema();
GenericData.Fixed fixedValue = new GenericData.Fixed(schema.getField("fixed_field").schema());
byte[] bytes = {5, 4, 3, 2, 1};
fixedValue.bytes(bytes);
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", VARBINARY, "fixed_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "fixed_field", fixedType.toString(), fixedValue);
checkValue(decodedRow, row, Slices.wrappedBuffer(bytes));
}
@Test
public void testDoubleDecodedAsDouble()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", DOUBLE, "double_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "double_field", "\"double\"", 56.898);
checkValue(decodedRow, row, 56.898);
}
@Test
public void testStringDecodedAsVarcharN()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", createVarcharType(10), "varcharn_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "varcharn_field", "\"string\"", "abcdefghijklmno");
checkValue(decodedRow, row, "abcdefghij");
}
@Test
public void testNestedRecord()
throws Exception
{
String schema = "{\"type\" : \"record\", " +
" \"name\" : \"nested_schema\"," +
" \"namespace\" : \"com.facebook.presto.decoder.avro\"," +
" \"fields\" :" +
" [{" +
" \"name\":\"nested\"," +
" \"type\":{" +
" \"type\":\"record\"," +
" \"name\":\"Nested\"," +
" \"fields\":" +
" [" +
" {" +
" \"name\":\"id\"," +
" \"type\":[\"long\", \"null\"]" +
" }" +
" ]" +
" }" +
" }]}";
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", BIGINT, "nested/id", null, null, false, false, false);
Schema nestedSchema = new Schema.Parser().parse(schema);
Schema userSchema = nestedSchema.getField("nested").schema();
GenericData.Record userRecord = buildAvroRecord(userSchema, new ByteArrayOutputStream(), ImmutableMap.of("id", 98247748L));
byte[] avroData = buildAvroData(nestedSchema, "nested", userRecord);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = decodeRow(
avroData,
ImmutableSet.of(row),
ImmutableMap.of(DATA_SCHEMA, schema));
assertEquals(decodedRow.size(), 1);
checkValue(decodedRow, row, 98247748);
}
@Test
public void testRuntimeDecodingFailure()
{
DecoderTestColumnHandle booleanColumn = new DecoderTestColumnHandle(0, "some_column", BOOLEAN, "long_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(booleanColumn, "long_field", "\"long\"", (long) 1);
assertThatThrownBy(decodedRow.get(booleanColumn)::getBoolean)
.isInstanceOf(PrestoException.class)
.hasMessageMatching("cannot decode object of 'class java.lang.Long' as 'boolean' for column 'some_column'");
}
@Test
public void testArrayDecodedAsArray()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", new ArrayType(BIGINT), "array_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "array_field", "{\"type\": \"array\", \"items\": \"long\"}", ImmutableList.of(114L, 136L));
checkArrayValue(decodedRow, row, new long[] {114, 136});
}
@Test
public void testArrayWithNulls()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", new ArrayType(BIGINT), "array_field", null, null, false, false, false);
List<Long> values = new ArrayList<>();
values.add(null);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "array_field", "{\"type\": \"array\", \"items\": \"null\"}", values);
checkArrayItemIsNull(decodedRow, row, new long[] {0});
}
@Test
public void testMapDecodedAsMap()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", VACHAR_MAP_TYPE, "map_field", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "map_field", "{\"type\": \"map\", \"values\": \"string\"}", ImmutableMap.of(
"key1", "abc",
"key2", "def",
"key3", "zyx"));
checkMapValue(decodedRow, row, ImmutableMap.of(
"key1", "abc",
"key2", "def",
"key3", "zyx"));
}
@Test
public void testMapWithNull()
throws Exception
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", VACHAR_MAP_TYPE, "map_field", null, null, false, false, false);
Map<String, String> expectedValues = new HashMap<>();
expectedValues.put("key1", null);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "map_field", "{\"type\": \"map\", \"values\": \"null\"}", expectedValues);
checkMapValue(decodedRow, row, expectedValues);
}
private static void checkArrayValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, long[] expected)
{
Block actualBlock = getBlock(decodedRow, handle);
assertEquals(actualBlock.getPositionCount(), expected.length);
for (int i = 0; i < actualBlock.getPositionCount(); i++) {
assertFalse(actualBlock.isNull(i));
assertEquals(BIGINT.getLong(actualBlock, i), expected[i]);
}
}
private static void checkArrayItemIsNull(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, long[] expected)
{
Block actualBlock = getBlock(decodedRow, handle);
assertEquals(actualBlock.getPositionCount(), expected.length);
for (int i = 0; i < actualBlock.getPositionCount(); i++) {
assertTrue(actualBlock.isNull(i));
assertEquals(BIGINT.getLong(actualBlock, i), expected[i]);
}
}
private static void checkMapValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderTestColumnHandle handle, Map<String, String> expected)
{
Block actualBlock = getBlock(decodedRow, handle);
assertEquals(actualBlock.getPositionCount(), expected.size() * 2);
for (int i = 0; i < actualBlock.getPositionCount(); i += 2) {
String actualKey = VARCHAR.getSlice(actualBlock, i).toStringUtf8();
String actualValue;
if (actualBlock.isNull(i + 1)) {
actualValue = null;
}
else {
actualValue = VARCHAR.getSlice(actualBlock, i + 1).toStringUtf8();
}
assertTrue(expected.containsKey(actualKey));
assertEquals(actualValue, expected.get(actualKey));
}
}
private static Block getBlock(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle)
{
FieldValueProvider provider = decodedRow.get(handle);
assertNotNull(provider);
return provider.getBlock();
}
@Test
public void testInvalidExtraneousParameters()
{
assertThatThrownBy(() -> singleColumnDecoder(BigintType.BIGINT, "mapping", null, "hint", false, false, false))
.isInstanceOf(PrestoException.class)
.hasMessageMatching("unexpected format hint 'hint' defined for column 'some_column'");
assertThatThrownBy(() -> singleColumnDecoder(BigintType.BIGINT, "mapping", null, null, false, false, true))
.isInstanceOf(PrestoException.class)
.hasMessageMatching("unexpected internal column 'some_column'");
}
@Test
public void testSupportedDataTypeValidation()
{
// supported types
singleColumnDecoder(BigintType.BIGINT);
singleColumnDecoder(VarbinaryType.VARBINARY);
singleColumnDecoder(BooleanType.BOOLEAN);
singleColumnDecoder(DoubleType.DOUBLE);
singleColumnDecoder(createUnboundedVarcharType());
singleColumnDecoder(createVarcharType(100));
singleColumnDecoder(new ArrayType(BigintType.BIGINT));
singleColumnDecoder(VACHAR_MAP_TYPE);
singleColumnDecoder(DOUBLE_MAP_TYPE);
// some unsupported types
assertUnsupportedColumnTypeException(() -> singleColumnDecoder(DecimalType.createDecimalType(10, 4)));
}
private void assertUnsupportedColumnTypeException(ThrowableAssert.ThrowingCallable callable)
{
assertThatThrownBy(callable)
.isInstanceOf(PrestoException.class)
.hasMessageMatching("Unsupported column type .* for column .*");
}
private static SchemaBuilder.FieldAssembler<Schema> getFieldBuilder()
{
return SchemaBuilder.record("test_schema")
.namespace("com.facebook.presto.decoder.avro")
.fields();
}
private void singleColumnDecoder(Type columnType)
{
String someSchema = getFieldBuilder()
.name("dummy").type().longType().noDefault()
.endRecord()
.toString();
DECODER_FACTORY.create(ImmutableMap.of(DATA_SCHEMA, someSchema), ImmutableSet.of(new DecoderTestColumnHandle(0, "some_column", columnType, "0", null, null, false, false, false)));
}
private void singleColumnDecoder(Type columnType, String mapping, String dataFormat, String formatHint, boolean keyDecoder, boolean hidden, boolean internal)
{
String someSchema = getFieldBuilder()
.name("dummy").type().longType().noDefault()
.endRecord()
.toString();
DECODER_FACTORY.create(ImmutableMap.of(DATA_SCHEMA, someSchema), ImmutableSet.of(new DecoderTestColumnHandle(0, "some_column", columnType, mapping, dataFormat, formatHint, keyDecoder, hidden, internal)));
}
}