TestApplyChangelogFunction.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.FunctionExtractor;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.function.JavaAggregationFunctionImplementation;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.List;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.DELETE;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.INSERT;
import static com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction.NAME;
import static com.facebook.presto.operator.aggregation.AggregationTestUtils.assertAggregation;
import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;
public class TestApplyChangelogFunction
{
private static final FunctionAndTypeManager FUNCTION_AND_TYPE_MANAGER = MetadataManager.createTestMetadataManager().getFunctionAndTypeManager();
@BeforeClass
public static void registerFunction()
{
FUNCTION_AND_TYPE_MANAGER.registerBuiltInFunctions(FunctionExtractor.extractFunctions(ApplyChangelogFunction.class));
}
@Test
public void testInsert()
{
JavaAggregationFunctionImplementation impl = getAggregation(BIGINT, VARCHAR, INTEGER);
assertAggregation(impl,
2,
toBlocks(
record(1, INSERT, 2)));
}
@Test
public void testUpdate()
{
JavaAggregationFunctionImplementation impl = getAggregation(BIGINT, VARCHAR, INTEGER);
assertAggregation(impl,
2,
toBlocks(
record(0, INSERT, 1),
record(1, DELETE, 1),
record(1, INSERT, 2)));
}
@Test
public void testDelete()
{
JavaAggregationFunctionImplementation impl = getAggregation(BIGINT, VARCHAR, INTEGER);
assertAggregation(impl,
null,
toBlocks(
record(0, DELETE, 1)));
}
@Test
public void testEmpty()
{
JavaAggregationFunctionImplementation impl = getAggregation(BIGINT, VARCHAR, INTEGER);
assertAggregation(impl,
null,
toBlocks());
}
@Test
public void testMultiUpdate()
{
JavaAggregationFunctionImplementation impl = getAggregation(BIGINT, VARCHAR, INTEGER);
assertAggregation(impl,
5,
toBlocks(
record(0, DELETE, 1),
record(1, INSERT, 2),
record(2, DELETE, 2),
record(2, INSERT, 3),
record(3, DELETE, 3),
record(4, INSERT, 5)));
}
private static class ChangelogRecord
{
private final long ordinal;
private final String operation;
private final int recordData;
private ChangelogRecord(long ordinal, ChangelogOperation operation, int recordData)
{
this.ordinal = ordinal;
this.operation = operation.name();
this.recordData = recordData;
}
}
private static ChangelogRecord record(long ordinal, ChangelogOperation operation, int data)
{
return new ChangelogRecord(ordinal, operation, data);
}
private static Block[] toBlocks(ChangelogRecord... changelogRecords)
{
List<ChangelogRecord> records = Arrays.asList(changelogRecords);
BlockBuilder ordinalBlock = BIGINT.createFixedSizeBlockBuilder(records.size());
BlockBuilder opBlock = VARCHAR.createBlockBuilder(null, records.size());
BlockBuilder valueBlock = INTEGER.createFixedSizeBlockBuilder(records.size());
records.forEach(record -> {
ordinalBlock.writeLong(record.ordinal);
Slice op = Slices.utf8Slice(record.operation);
opBlock.writeBytes(op, 0, op.length()).closeEntry();
valueBlock.writeInt(record.recordData);
});
ordinalBlock.closeEntry();
valueBlock.closeEntry();
return new Block[] {ordinalBlock.build(), opBlock.build(), valueBlock.build()};
}
private JavaAggregationFunctionImplementation getAggregation(Type... arguments)
{
return FUNCTION_AND_TYPE_MANAGER.getJavaAggregateFunctionImplementation(FUNCTION_AND_TYPE_MANAGER.lookupFunction(NAME, fromTypes(arguments)));
}
}