AbstractRowEncoder.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.kafka.encoder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.SqlDate;
import com.facebook.presto.common.type.SqlTime;
import com.facebook.presto.common.type.SqlTimeWithTimeZone;
import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.common.type.SqlTimestampWithTimeZone;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.ConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import java.nio.ByteBuffer;
import java.util.List;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimeType.TIME;
import static com.facebook.presto.common.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.isVarbinaryType;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
public abstract class AbstractRowEncoder
implements RowEncoder
{
protected final ConnectorSession session;
protected final List<EncoderColumnHandle> columnHandles;
/**
* The current column index for appending values to the row encoder.
* Gets incremented by appendColumnValue and set back to zero when the encoder is reset.
*/
protected int currentColumnIndex;
protected AbstractRowEncoder(ConnectorSession session, List<EncoderColumnHandle> columnHandles)
{
this.session = requireNonNull(session, "session is null");
requireNonNull(columnHandles, "columnHandles is null");
this.columnHandles = ImmutableList.copyOf(columnHandles);
this.currentColumnIndex = 0;
}
@Override
public void appendColumnValue(Block block, int position)
{
checkArgument(currentColumnIndex < columnHandles.size(), format("currentColumnIndex '%d' is greater than number of columns '%d'", currentColumnIndex, columnHandles.size()));
Type type = columnHandles.get(currentColumnIndex).getType();
if (block.isNull(position)) {
appendNullValue();
}
else if (type == BOOLEAN) {
appendBoolean(type.getBoolean(block, position));
}
else if (type == BIGINT) {
appendLong(type.getLong(block, position));
}
else if (type == INTEGER) {
appendInt(toIntExact(type.getLong(block, position)));
}
else if (type == SMALLINT) {
appendShort(Shorts.checkedCast(type.getLong(block, position)));
}
else if (type == TINYINT) {
appendByte(SignedBytes.checkedCast(type.getLong(block, position)));
}
else if (type.equals(DOUBLE)) {
appendDouble(type.getDouble(block, position));
}
else if (type.equals(REAL)) {
appendFloat(intBitsToFloat(toIntExact(type.getLong(block, position))));
}
else if (isVarcharType(type)) {
appendString(type.getSlice(block, position).toStringUtf8());
}
else if (isVarbinaryType(type)) {
appendByteBuffer(type.getSlice(block, position).toByteBuffer());
}
else if (type == DATE) {
appendSqlDate((SqlDate) type.getObjectValue(session.getSqlFunctionProperties(), block, position));
}
else if (type == TIME) {
appendSqlTime((SqlTime) type.getObjectValue(session.getSqlFunctionProperties(), block, position));
}
else if (type == TIME_WITH_TIME_ZONE) {
appendSqlTimeWithTimeZone((SqlTimeWithTimeZone) type.getObjectValue(session.getSqlFunctionProperties(), block, position));
}
else if (type instanceof TimestampType) {
appendSqlTimestamp((SqlTimestamp) type.getObjectValue(session.getSqlFunctionProperties(), block, position));
}
else if (type instanceof TimestampWithTimeZoneType) {
appendSqlTimestampWithTimeZone((SqlTimestampWithTimeZone) type.getObjectValue(session.getSqlFunctionProperties(), block, position));
}
else {
throw new UnsupportedOperationException(format("Column '%s' does not support 'null' value", columnHandles.get(currentColumnIndex).getName()));
}
currentColumnIndex++;
}
// these append value methods should be overridden for each row encoder
// only the methods with types supported by the data format should be overridden
protected void appendNullValue()
{
throw new UnsupportedOperationException(format("Column '%s' does not support 'null' value", columnHandles.get(currentColumnIndex).getName()));
}
protected void appendLong(long value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", long.class.getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendInt(int value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", int.class.getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendShort(short value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", short.class.getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendByte(byte value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", byte.class.getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendDouble(double value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", double.class.getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendFloat(float value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", float.class.getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendBoolean(boolean value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", boolean.class.getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendString(String value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", value.getClass().getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendByteBuffer(ByteBuffer value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", value.getClass().getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendSqlDate(SqlDate value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", value.getClass().getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendSqlTime(SqlTime value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", value.getClass().getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendSqlTimeWithTimeZone(SqlTimeWithTimeZone value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", value.getClass().getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendSqlTimestamp(SqlTimestamp value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", value.getClass().getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void appendSqlTimestampWithTimeZone(SqlTimestampWithTimeZone value)
{
throw new UnsupportedOperationException(format("Unsupported type '%s' for column '%s'", value.getClass().getName(), columnHandles.get(currentColumnIndex).getName()));
}
protected void resetColumnIndex()
{
currentColumnIndex = 0;
}
@Override
public void close()
{
}
}