TestS3SelectRecordCursor.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.s3select;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.TestingTypeManager;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveType;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.mapred.RecordReader;
import org.joda.time.DateTimeZone;
import org.testng.annotations.Test;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.s3select.S3SelectRecordCursor.updateSplitSchema;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.joining;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_DDL;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
import static org.testng.Assert.assertEquals;
public class TestS3SelectRecordCursor
{
private static final String LAZY_SERDE_CLASS_NAME = LazySimpleSerDe.class.getName();
static final HiveColumnHandle ARTICLE_COLUMN = new HiveColumnHandle("article", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 1, REGULAR, Optional.empty(), Optional.empty());
static final HiveColumnHandle AUTHOR_COLUMN = new HiveColumnHandle("author", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 1, REGULAR, Optional.empty(), Optional.empty());
static final HiveColumnHandle DATE_ARTICLE_COLUMN = new HiveColumnHandle("date_pub", HIVE_INT, parseTypeSignature(StandardTypes.DATE), 1, REGULAR, Optional.empty(), Optional.empty());
static final HiveColumnHandle QUANTITY_COLUMN = new HiveColumnHandle("quantity", HIVE_INT, parseTypeSignature(StandardTypes.INTEGER), 1, REGULAR, Optional.empty(), Optional.empty());
private static final HiveColumnHandle[] DEFAULT_TEST_COLUMNS = {ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN};
private static final HiveColumnHandle MOCK_HIVE_COLUMN_HANDLE = new HiveColumnHandle("mockName", HiveType.HIVE_FLOAT, parseTypeSignature(StandardTypes.DOUBLE), 88, PARTITION_KEY, Optional.empty(), Optional.empty());
private static final TypeManager MOCK_TYPE_MANAGER = new TestingTypeManager();
private static final Path MOCK_PATH = new Path("mockPath");
@Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = "splitSchema is null")
public void shouldFailOnNullSplitSchema()
{
new S3SelectRecordCursor(
new Configuration(),
MOCK_PATH,
MOCK_RECORD_READER,
100L,
null,
singletonList(MOCK_HIVE_COLUMN_HANDLE),
DateTimeZone.UTC,
MOCK_TYPE_MANAGER);
}
@Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = "columns is null")
public void shouldFailOnNullColumns()
{
new S3SelectRecordCursor(
new Configuration(),
MOCK_PATH,
MOCK_RECORD_READER,
100L,
new Properties(),
null,
DateTimeZone.UTC,
MOCK_TYPE_MANAGER);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Thrift DDL struct article \\{ \\}")
public void shouldThrowIllegalArgumentExceptionWhenSerialDDLHasNoColumns()
{
String ddlSerializationValue = "struct article { }";
buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Thrift DDL should start with struct")
public void shouldThrowIllegalArgumentExceptionWhenSerialDDLNotStartingWithStruct()
{
String ddlSerializationValue = "foo article { varchar article varchar }";
buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Thrift DDL struct article \\{varchar article\\}")
public void shouldThrowIllegalArgumentExceptionWhenSerialDDLNotStartingWithStruct2()
{
String ddlSerializationValue = "struct article {varchar article}";
buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Thrift DDL struct article varchar article varchar \\}")
public void shouldThrowIllegalArgumentExceptionWhenMissingOpenStartStruct()
{
String ddlSerializationValue = "struct article varchar article varchar }";
buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Thrift DDL struct article\\{varchar article varchar author date date_pub int quantity")
public void shouldThrowIllegalArgumentExceptionWhenDDlFormatNotCorrect()
{
String ddlSerializationValue = "struct article{varchar article varchar author date date_pub int quantity";
buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Thrift DDL struct article \\{ varchar article varchar author date date_pub int quantity ")
public void shouldThrowIllegalArgumentExceptionWhenEndOfStructNotFound()
{
String ddlSerializationValue = "struct article { varchar article varchar author date date_pub int quantity ";
buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS);
}
@Test
public void shouldFilterColumnsWhichDoesNotMatchInTheHiveTable()
{
String ddlSerializationValue = "struct article { varchar address varchar company date date_pub int quantity}";
String expectedDDLSerialization = "struct article { date date_pub, int quantity}";
assertEquals(buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS),
buildExpectedProperties(expectedDDLSerialization, DEFAULT_TEST_COLUMNS));
}
@Test
public void shouldReturnOnlyQuantityColumnInTheDDl()
{
String ddlSerializationValue = "struct article { varchar address varchar company date date_pub int quantity}";
String expectedDDLSerialization = "struct article { int quantity}";
assertEquals(buildSplitSchema(ddlSerializationValue, ARTICLE_COLUMN, QUANTITY_COLUMN),
buildExpectedProperties(expectedDDLSerialization, ARTICLE_COLUMN, QUANTITY_COLUMN));
}
@Test
public void shouldReturnProperties()
{
String ddlSerializationValue = "struct article { varchar article varchar author date date_pub int quantity}";
String expectedDDLSerialization = "struct article { varchar article, varchar author, date date_pub, int quantity}";
assertEquals(buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS),
buildExpectedProperties(expectedDDLSerialization, DEFAULT_TEST_COLUMNS));
}
@Test
public void shouldReturnPropertiesWithoutDoubleCommaInColumnsNameLastColumnNameWithEndStruct()
{
String ddlSerializationValue = "struct article { varchar article, varchar author, date date_pub, int quantity}";
String expectedDDLSerialization = "struct article { varchar article, varchar author, date date_pub, int quantity}";
assertEquals(buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS),
buildExpectedProperties(expectedDDLSerialization, DEFAULT_TEST_COLUMNS));
}
@Test
public void shouldReturnPropertiesWithoutDoubleCommaInColumnsNameLastColumnNameWithoutEndStruct()
{
String ddlSerializationValue = "struct article { varchar article, varchar author, date date_pub, int quantity }";
String expectedDDLSerialization = "struct article { varchar article, varchar author, date date_pub, int quantity}";
assertEquals(buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS),
buildExpectedProperties(expectedDDLSerialization, DEFAULT_TEST_COLUMNS));
}
@Test
public void shouldOnlyGetColumnTypeFromHiveObjectAndNotFromDDLSerialLastColumnNameWithEndStruct()
{
String ddlSerializationValue = "struct article { int article, double author, xxxx date_pub, int quantity}";
String expectedDDLSerialization = "struct article { int article, double author, xxxx date_pub, int quantity}";
assertEquals(buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS),
buildExpectedProperties(expectedDDLSerialization, DEFAULT_TEST_COLUMNS));
}
@Test
public void shouldOnlyGetColumnTypeFromHiveObjectAndNotFromDDLSerialLastColumnNameWithoutEndStruct()
{
String ddlSerializationValue = "struct article { int article, double author, xxxx date_pub, int quantity }";
String expectedDDLSerialization = "struct article { int article, double author, xxxx date_pub, int quantity}";
assertEquals(buildSplitSchema(ddlSerializationValue, DEFAULT_TEST_COLUMNS),
buildExpectedProperties(expectedDDLSerialization, DEFAULT_TEST_COLUMNS));
}
@Test(expectedExceptions = NullPointerException.class)
public void shouldThrowNullPointerExceptionWhenColumnsIsNull()
{
updateSplitSchema(new Properties(), null);
}
@Test(expectedExceptions = NullPointerException.class)
public void shouldThrowNullPointerExceptionWhenSchemaIsNull()
{
updateSplitSchema(null, ImmutableList.of());
}
private Properties buildSplitSchema(String ddlSerializationValue, HiveColumnHandle... columns)
{
Properties properties = new Properties();
properties.put(SERIALIZATION_LIB, LAZY_SERDE_CLASS_NAME);
properties.put(SERIALIZATION_DDL, ddlSerializationValue);
return updateSplitSchema(properties, asList(columns));
}
private Properties buildExpectedProperties(String expectedDDLSerialization, HiveColumnHandle... expectedColumns)
{
String expectedColumnsType = getTypes(expectedColumns);
String expectedColumnsName = getName(expectedColumns);
Properties propExpected = new Properties();
propExpected.put(LIST_COLUMNS, expectedColumnsName);
propExpected.put(SERIALIZATION_LIB, LAZY_SERDE_CLASS_NAME);
propExpected.put(SERIALIZATION_DDL, expectedDDLSerialization);
propExpected.put(LIST_COLUMN_TYPES, expectedColumnsType);
return propExpected;
}
private String getName(HiveColumnHandle[] expectedColumns)
{
return Stream.of(expectedColumns)
.map(HiveColumnHandle::getName)
.collect(joining(","));
}
private String getTypes(HiveColumnHandle[] expectedColumns)
{
return Stream.of(expectedColumns)
.map(HiveColumnHandle::getHiveType)
.map(HiveType::getTypeInfo)
.map(TypeInfo::getTypeName)
.collect(joining(","));
}
private static final RecordReader<?, ?> MOCK_RECORD_READER = new RecordReader()
{
@Override
public boolean next(Object key, Object value)
{
throw new UnsupportedOperationException();
}
@Override
public Object createKey()
{
throw new UnsupportedOperationException();
}
@Override
public Object createValue()
{
throw new UnsupportedOperationException();
}
@Override
public long getPos()
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
throw new UnsupportedOperationException();
}
@Override
public float getProgress()
{
throw new UnsupportedOperationException();
}
};
}