ChangelogRecord.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.function.changelog;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.block.RowBlock;
import com.facebook.presto.common.block.SingleRowBlock;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.openjdk.jol.info.ClassLayout;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.INSERT;
import static java.util.Objects.requireNonNull;
public class ChangelogRecord
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ChangelogRecord.class).instanceSize();
private Block lastRow;
private Slice lastOperation;
private int lastOrdinal;
private final Type serializedType;
private final Type stateType;
private static Type getSerializedRowType(Type inner)
{
return RowType.anonymous(ImmutableList.of(BIGINT, VARCHAR, inner));
}
public ChangelogRecord(Type stateType)
{
this.stateType = requireNonNull(stateType, "type is null");
this.serializedType = getSerializedRowType(stateType);
this.lastOperation = Slices.EMPTY_SLICE;
this.lastOrdinal = -1; // ensures this assumes the values from the first merge.
}
public ChangelogRecord(Type stateType, int lastOrdinal, Slice lastOperation, Block lastRow)
{
this(stateType);
this.lastOrdinal = lastOrdinal;
this.lastOperation = lastOperation;
this.lastRow = lastRow;
}
public void add(Integer ordinal, Slice operation, Block row)
{
merge(new ChangelogRecord(stateType, ordinal, operation, row));
}
public Block getRow()
{
return lastRow;
}
public Slice getLastOperation()
{
return lastOperation;
}
public ChangelogRecord merge(ChangelogRecord other)
{
if (other.lastOrdinal > lastOrdinal) {
this.lastOperation = other.lastOperation;
this.lastRow = other.lastRow;
this.lastOrdinal = other.lastOrdinal;
}
else if (other.lastOrdinal == lastOrdinal) {
// ordinals are equal. In the case both operations are inserts, we
// don't have a way to break ties. Likely an error, throw exception
ChangelogOperation operation = ChangelogOperation.valueOf(other.lastOperation.toStringUtf8().toUpperCase());
switch (operation) {
case UPDATE_AFTER:
case INSERT:
if (ChangelogOperation.valueOf(lastOperation.toStringUtf8().toUpperCase()).equals(INSERT)) {
throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "unresolvable order for two inserts");
}
lastOperation = other.lastOperation;
lastRow = other.lastRow;
lastOrdinal = other.lastOrdinal;
break;
case UPDATE_BEFORE:
case DELETE:
// we don't need to record data before a record update/delete
break;
default:
throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "unsupported operation type " + operation);
}
}
return this;
}
public void serialize(BlockBuilder out)
{
BlockBuilder entry = out.beginBlockEntry();
BIGINT.writeLong(entry, lastOrdinal);
VARCHAR.writeSlice(entry, lastOperation);
stateType.appendTo(lastRow, 0, entry);
out.closeEntry();
}
/**
* Sets the inner values of this object to the serialized state contained
* within the {@link Block} parameter.
*
* @param block block containing the serialized state
* @param index index in the block containing the state
*/
public void deserialize(Block block, int index)
{
Block serializedStateBlock = block.getSingleValueBlock(index);
if (!(serializedStateBlock instanceof RowBlock)) {
throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "changelog deserialization must only be row block type");
}
SingleRowBlock row = (SingleRowBlock) serializedType.getObject(serializedStateBlock, 0);
lastOrdinal = (int) row.getLong(0);
lastOperation = row.getSlice(1, 0, row.getSliceLength(1));
lastRow = row.getSingleValueBlock(2);
}
public long getEstimatedSize()
{
long size = INSTANCE_SIZE + IntegerType.INTEGER.getFixedSize();
if (lastOperation != null) {
size += lastOperation.getRetainedSize();
}
if (lastRow != null) {
size += lastRow.getSizeInBytes();
}
return size;
}
}