PinotSegmentPageSource.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.pinot;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.pinot.query.PinotProxyGrpcRequestBuilder;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.connector.presto.grpc.PinotStreamingQueryClient;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.CommonConstants;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.JsonType.JSON;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_DATA_FETCH_EXCEPTION;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_INVALID_SEGMENT_QUERY_GENERATED;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_INVALID_SQL_GENERATED;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNEXPECTED_RESPONSE;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.Slices.utf8Slice;
import static java.util.Objects.requireNonNull;

/**
 * This class retrieves Pinot data from a Pinot client, and re-constructs the data into Presto Pages.
 */

public class PinotSegmentPageSource
        implements ConnectorPageSource
{
    protected final List<PinotColumnHandle> columnHandles;
    protected final List<Type> columnTypes;
    protected final PinotConfig pinotConfig;
    protected final PinotSplit split;
    protected final ConnectorSession session;
    private final PinotStreamingQueryClient pinotStreamingQueryClient;
    private Iterator<Server.ServerResponse> serverResponseIterator;
    private long completedPositions;

    protected long completedBytes;
    protected long readTimeNanos;
    protected long estimatedMemoryUsageInBytes;
    protected PinotDataTableWithSize currentDataTable;
    protected boolean closed;

    public PinotSegmentPageSource(
            ConnectorSession session,
            PinotConfig pinotConfig,
            PinotStreamingQueryClient pinotStreamingQueryClient,
            PinotSplit split,
            List<PinotColumnHandle> columnHandles)
    {
        this.pinotConfig = requireNonNull(pinotConfig, "pinotConfig is null");
        this.split = requireNonNull(split, "split is null");
        this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");
        this.session = requireNonNull(session, "session is null");
        this.columnTypes = columnHandles.stream()
                .map(PinotSegmentPageSource::getTypeForBlock)
                .collect(Collectors.toList());
        this.pinotStreamingQueryClient = requireNonNull(pinotStreamingQueryClient, "pinotStreamingQueryClient is null");
    }

    public static void checkExceptions(DataTable dataTable, PinotSplit split, boolean markDataFetchExceptionsAsRetriable)
    {
        Map<String, String> metadata = dataTable.getMetadata();
        List<String> exceptions = new ArrayList<>();
        metadata.forEach((k, v) -> {
            if (k.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
                exceptions.add(v);
            }
        });
        if (!exceptions.isEmpty()) {
            throw new PinotException(
                markDataFetchExceptionsAsRetriable ? PINOT_DATA_FETCH_EXCEPTION : PINOT_EXCEPTION,
                split.getSegmentPinotQuery(),
                String.format("Encountered %d pinot exceptions for split %s: %s", exceptions.size(), split, exceptions));
        }
        int numColumnsExpected = split.getExpectedColumnHandles().size();
        int numColumnsActual = dataTable.getDataSchema().size();
        if (numColumnsActual != numColumnsExpected) {
            throw new PinotException(
                    PINOT_EXCEPTION,
                    split.getSegmentPinotQuery(),
                    String.format("Expected pinot to contain %d columns but got %d: %s", numColumnsExpected, numColumnsActual, dataTable.getDataSchema()));
        }
    }

    @Override
    public long getCompletedBytes()
    {
        return completedBytes;
    }

    @Override
    public long getReadTimeNanos()
    {
        return readTimeNanos;
    }

    @Override
    public long getSystemMemoryUsage()
    {
        return estimatedMemoryUsageInBytes;
    }

    /**
     * @return true if is closed.
     */
    @Override
    public boolean isFinished()
    {
        return closed;
    }

    private Page fillNextPage()
    {
        // This is the list of handles we came up with when generating the SQL
        // This could be a superset/permutation of the handles being requested in this scan
        List<PinotColumnHandle> expectedColumnHandles = split.getExpectedColumnHandles();
        PageBuilder pageBuilder = new PageBuilder(columnTypes);
        // Note that declared positions in the Page should be the same with number of rows in each Block
        pageBuilder.declarePositions(currentDataTable.getDataTable().getNumberOfRows());
        for (int columnHandleIndex = 0; columnHandleIndex < columnHandles.size(); columnHandleIndex++) {
            BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(columnHandleIndex);
            Type columnType = columnTypes.get(columnHandleIndex);
            // Write a block for each column in the original order.
            PinotColumnHandle handle = columnHandles.get(columnHandleIndex);

            // map the handle needed by the scan to its index corresponding to the generated SQL
            // All handles requested by the scan should be a subset of the expected handles
            // ie., the expected column handles (corresponding to the generated SQL) can contain
            // extra columns that we drop.
            int indexReturnedByPinot = expectedColumnHandles.indexOf(handle);
            if (indexReturnedByPinot < 0) {
                throw new PinotException(
                        PINOT_INVALID_SQL_GENERATED,
                        split.getSegmentPinotQuery(),
                        String.format("Expected column handle %s to be present in the handles %s corresponding to the segment Pinot SQL", handle, expectedColumnHandles));
            }
            writeBlock(blockBuilder, columnType, indexReturnedByPinot);
        }

        return pageBuilder.build();
    }

    /**
     * @return constructed page for pinot data.
     */
    @Override
    public Page getNextPage()
    {
        if (closed) {
            return null;
        }

        if (serverResponseIterator == null) {
            serverResponseIterator = queryPinot(split);
        }
        ByteBuffer byteBuffer = null;
        try {
            // Pinot gRPC server response iterator returns:
            //   - n data blocks based on inbound message size;
            //   - 1 metadata of the query results.
            // So we need to check ResponseType of each ServerResponse.
            if (serverResponseIterator.hasNext()) {
                long startTimeNanos = System.nanoTime();
                Server.ServerResponse serverResponse = serverResponseIterator.next();
                readTimeNanos += System.nanoTime() - startTimeNanos;
                final String responseType = serverResponse.getMetadataOrThrow("responseType");
                switch (responseType) {
                    case CommonConstants.Query.Response.ResponseType.DATA:
                        estimatedMemoryUsageInBytes = serverResponse.getSerializedSize();
                        // Store each dataTable which will later be constructed into Pages.
                        try {
                            byteBuffer = serverResponse.getPayload().asReadOnlyByteBuffer();
                            DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
                            checkExceptions(dataTable, split, PinotSessionProperties.isMarkDataFetchExceptionsAsRetriable(session));
                            currentDataTable = new PinotSegmentPageSource.PinotDataTableWithSize(dataTable, serverResponse.getSerializedSize());
                        }
                        catch (IOException e) {
                            throw new PinotException(
                                PINOT_DATA_FETCH_EXCEPTION,
                                split.getSegmentPinotQuery(),
                                String.format("Encountered Pinot exceptions when fetching data table from Split: < %s >", split),
                                e);
                        }
                        break;
                    case CommonConstants.Query.Response.ResponseType.METADATA:
                        // The last part of the response is Metadata
                        currentDataTable = null;
                        serverResponseIterator = null;
                        close();
                        return null;
                    default:
                        throw new PinotException(
                            PINOT_UNEXPECTED_RESPONSE,
                            split.getSegmentPinotQuery(),
                            String.format("Encountered Pinot exceptions, unknown response type - %s", responseType));
                }
            }
            Page page = fillNextPage();
            completedPositions += currentDataTable.getDataTable().getNumberOfRows();
            return page;
        }
        finally {
            if (byteBuffer != null) {
                ((Buffer) byteBuffer).clear();
            }
        }
    }

    private Iterator<Server.ServerResponse> queryPinot(PinotSplit split)
    {
        String sql = split.getSegmentPinotQuery().orElseThrow(() -> new PinotException(PINOT_INVALID_SEGMENT_QUERY_GENERATED, Optional.empty(), "Expected the segment split to contain the pinot query"));
        String grpcHost = split.getGrpcHost().orElseThrow(() -> new PinotException(PINOT_INVALID_SEGMENT_QUERY_GENERATED, Optional.empty(), "Expected the segment split to contain the grpc host"));
        int grpcPort = split.getGrpcPort().orElseThrow(() -> new PinotException(PINOT_INVALID_SEGMENT_QUERY_GENERATED, Optional.empty(), "Expected the segment split to contain the grpc port"));
        if (grpcPort <= 0) {
            throw new PinotException(
                PINOT_INVALID_SEGMENT_QUERY_GENERATED,
                Optional.empty(),
                "Expected the grpc port > 0 always");
        }
        PinotProxyGrpcRequestBuilder grpcRequestBuilder = new PinotProxyGrpcRequestBuilder()
                .setSegments(split.getSegments())
                .setEnableStreaming(true)
                .setBrokerId("presto-coordinator-grpc")
                .addExtraMetadata(pinotConfig.getExtraGrpcMetadata())
                .setSql(sql);
        if (pinotConfig.isUseProxy()) {
            grpcRequestBuilder.setHostName(grpcHost).setPort(grpcPort);
            return pinotStreamingQueryClient.submit(
                pinotConfig.getGrpcHost(),
                pinotConfig.getGrpcPort(),
                grpcRequestBuilder);
        }
        return pinotStreamingQueryClient.submit(grpcHost, grpcPort, grpcRequestBuilder);
    }

    @Override
    public long getCompletedPositions()
    {
        return completedPositions;
    }

    @Override
    public void close()
    {
        if (closed) {
            return;
        }
        closed = true;
    }

    /**
     * Generates the {@link com.facebook.presto.common.block.Block} for the specific column from the {@link #currentDataTable}.
     *
     * <p>Based on the original Pinot column types, write as Presto-supported values to {@link com.facebook.presto.common.block.BlockBuilder}, e.g.
     * FLOAT -> Double, INT -> Long, String -> Slice.
     *
     * @param blockBuilder blockBuilder for the current column
     * @param columnType type of the column
     * @param columnIndex column index
     */

    private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        Class<?> javaType = columnType.getJavaType();
        DataSchema.ColumnDataType pinotColumnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
        if (columnType instanceof ArrayType) {
            writeArrayBlock(blockBuilder, columnType, columnIndex);
        }
        else if (javaType.equals(boolean.class)) {
            writeBooleanBlock(blockBuilder, columnType, columnIndex);
        }
        else if (javaType.equals(long.class)) {
            if (pinotColumnType.toDataType().equals(FieldSpec.DataType.TIMESTAMP)) {
                writeTimestampBlock(blockBuilder, columnType, columnIndex);
            }
            else {
                writeLongBlock(blockBuilder, columnType, columnIndex);
            }
        }
        else if (javaType.equals(double.class)) {
            writeDoubleBlock(blockBuilder, columnType, columnIndex);
        }
        else if (pinotColumnType == DataSchema.ColumnDataType.BIG_DECIMAL) {
            writeBigDecimalBlock(blockBuilder, columnType, columnIndex);
        }
        else if (javaType.equals(Slice.class)) {
            writeSliceBlock(blockBuilder, columnType, columnIndex);
        }
        else {
            throw new PrestoException(
                    PINOT_UNSUPPORTED_COLUMN_TYPE,
                    String.format(
                            "Failed to write column %s. pinotColumnType %s, javaType %s",
                            split.getExpectedColumnHandles().get(columnIndex).getColumnName(),
                            pinotColumnType,
                            javaType));
        }
    }

    private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        for (int rowIndex = 0; rowIndex < currentDataTable.getDataTable().getNumberOfRows(); rowIndex++) {
            DataSchema.ColumnDataType columnPinotType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
            Type columnPrestoType = ((ArrayType) columnType).getElementType();
            BlockBuilder childBuilder = blockBuilder.beginBlockEntry();
            switch (columnPinotType) {
                case BOOLEAN_ARRAY:
                    int[] booleanArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
                    for (int i = 0; i < booleanArray.length; i++) {
                        // Both the numeric types implement a writeLong method which write if the bounds for
                        // the type allows else throw exception.
                        columnPrestoType.writeBoolean(childBuilder, Boolean.valueOf(booleanArray[i] > 0));
                        completedBytes += 1;
                    }
                    break;
                case INT_ARRAY:
                    int[] intArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
                    for (int i = 0; i < intArray.length; i++) {
                        // Both the numeric types implement a writeLong method which write if the bounds for
                        // the type allows else throw exception.
                        columnPrestoType.writeLong(childBuilder, intArray[i]);
                        completedBytes += Long.BYTES;
                    }
                    break;
                case LONG_ARRAY:
                case TIMESTAMP_ARRAY:
                    long[] longArray = currentDataTable.getDataTable().getLongArray(rowIndex, columnIndex);
                    for (int i = 0; i < longArray.length; i++) {
                        columnPrestoType.writeLong(childBuilder, longArray[i]);
                        completedBytes += Long.BYTES;
                    }
                    break;
                case FLOAT_ARRAY:
                    float[] floatArray = currentDataTable.getDataTable().getFloatArray(rowIndex, columnIndex);
                    if (columnPrestoType.getJavaType().equals(long.class)) {
                        for (int i = 0; i < floatArray.length; i++) {
                            columnPrestoType.writeLong(childBuilder, (long) floatArray[i]);
                            completedBytes += Long.BYTES;
                        }
                    }
                    else {
                        for (int i = 0; i < floatArray.length; i++) {
                            columnPrestoType.writeDouble(childBuilder, floatArray[i]);
                            completedBytes += Double.BYTES;
                        }
                    }
                    break;
                case DOUBLE_ARRAY:
                    double[] doubleArray = currentDataTable.getDataTable().getDoubleArray(rowIndex, columnIndex);
                    if (columnPrestoType.getJavaType().equals(long.class)) {
                        for (int i = 0; i < doubleArray.length; i++) {
                            columnPrestoType.writeLong(childBuilder, (long) doubleArray[i]);
                            completedBytes += Long.BYTES;
                        }
                    }
                    else {
                        for (int i = 0; i < doubleArray.length; i++) {
                            columnPrestoType.writeDouble(childBuilder, doubleArray[i]);
                            completedBytes += Double.BYTES;
                        }
                    }
                    break;
                case STRING_ARRAY:
                case BYTES_ARRAY:
                    String[] stringArray = currentDataTable.getDataTable().getStringArray(rowIndex, columnIndex);
                    for (int i = 0; i < stringArray.length; i++) {
                        Slice slice = Slices.utf8Slice(stringArray[i]);
                        childBuilder.writeBytes(slice, 0, slice.length()).closeEntry();
                        completedBytes += slice.getBytes().length;
                    }
                    break;
                default:
                    throw new PrestoException(
                            PINOT_UNSUPPORTED_COLUMN_TYPE,
                            String.format(
                                    "Failed to write column %s. pinotColumnType %s, prestoType %s",
                                    split.getExpectedColumnHandles().get(columnIndex).getColumnName(),
                                    columnPinotType,
                                    columnPrestoType));
            }
            blockBuilder.closeEntry();
        }
    }

    private void writeBooleanBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
            columnType.writeBoolean(blockBuilder, getBoolean(i, columnIndex));
            completedBytes++;
        }
    }

    private void writeLongBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
            columnType.writeLong(blockBuilder, getLong(i, columnIndex));
            completedBytes += Long.BYTES;
        }
    }

    private void writeTimestampBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
            columnType.writeLong(blockBuilder, getLong(i, columnIndex));
            completedBytes += Long.BYTES;
        }
    }

    private void writeDoubleBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
            columnType.writeDouble(blockBuilder, getDouble(i, columnIndex));
            completedBytes += Double.BYTES;
        }
    }

    private void writeBigDecimalBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
            Slice slice = Decimals.encodeScaledValue(getBigDecimal(i, columnIndex));
            columnType.writeSlice(blockBuilder, slice, 0, slice.length());
            completedBytes += slice.length();
        }
    }

    private void writeSliceBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
    {
        for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
            Slice slice = getSlice(i, columnIndex);
            columnType.writeSlice(blockBuilder, slice, 0, slice.length());
            completedBytes += slice.getBytes().length;
        }
    }

    private boolean getBoolean(int rowIndex, int columnIndex)
    {
        return currentDataTable.getDataTable().getInt(rowIndex, columnIndex) > 0;
    }

    private long getLong(int rowIndex, int columnIndex)
    {
        DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
        // Note columnType in the dataTable could be different from the original columnType in the columnHandle.
        // e.g. when original column type is int/long and aggregation value is requested, the returned dataType from Pinot would be double.
        // So need to cast it back to the original columnType.
        if (dataType.equals(DataSchema.ColumnDataType.DOUBLE)) {
            return (long) currentDataTable.getDataTable().getDouble(rowIndex, columnIndex);
        }
        if (dataType.equals(DataSchema.ColumnDataType.INT)) {
            return (long) currentDataTable.getDataTable().getInt(rowIndex, columnIndex);
        }
        else {
            return currentDataTable.getDataTable().getLong(rowIndex, columnIndex);
        }
    }

    private double getDouble(int rowIndex, int columnIndex)
    {
        DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
        if (dataType.equals(DataSchema.ColumnDataType.FLOAT)) {
            return currentDataTable.getDataTable().getFloat(rowIndex, columnIndex);
        }
        else {
            return currentDataTable.getDataTable().getDouble(rowIndex, columnIndex);
        }
    }

    private BigDecimal getBigDecimal(int rowIndex, int columnIndex)
    {
        return currentDataTable.getDataTable().getBigDecimal(rowIndex, columnIndex);
    }

    private Slice getSlice(int rowIndex, int columnIndex)
    {
        checkColumnType(columnIndex, new Type[] {
                VARCHAR, JSON, VARBINARY
        });
        DataSchema.ColumnDataType columnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
        switch (columnType) {
            case INT_ARRAY:
                int[] intArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
                return utf8Slice(Arrays.toString(intArray));
            case LONG_ARRAY:
                long[] longArray = currentDataTable.getDataTable().getLongArray(rowIndex, columnIndex);
                return utf8Slice(Arrays.toString(longArray));
            case FLOAT_ARRAY:
                float[] floatArray = currentDataTable.getDataTable().getFloatArray(rowIndex, columnIndex);
                return utf8Slice(Arrays.toString(floatArray));
            case DOUBLE_ARRAY:
                double[] doubleArray = currentDataTable.getDataTable().getDoubleArray(rowIndex, columnIndex);
                return utf8Slice(Arrays.toString(doubleArray));
            case STRING_ARRAY:
                String[] stringArray = currentDataTable.getDataTable().getStringArray(rowIndex, columnIndex);
                return utf8Slice(Arrays.toString(stringArray));
            case STRING:
            case JSON:
                String field = currentDataTable.getDataTable().getString(rowIndex, columnIndex);
                if (field == null || field.isEmpty()) {
                    return Slices.EMPTY_SLICE;
                }
                return Slices.utf8Slice(field);
            case BYTES:
                ByteArray byteArray = currentDataTable.getDataTable().getBytes(rowIndex, columnIndex);
                return Slices.wrappedBuffer(byteArray.getBytes());
        }
        return Slices.EMPTY_SLICE;
    }

    /**
     * Get estimated size in bytes for the Pinot column.
     * Deterministic for numeric fields; use estimate for other types to save calculation.
     *
     * @param dataType FieldSpec.dataType for Pinot column.
     * @return estimated size in bytes.
     */
    private int getEstimatedColumnSizeInBytes(DataSchema.ColumnDataType dataType)
    {
        if (dataType.isNumber()) {
            switch (dataType) {
                case LONG:
                    return Long.BYTES;
                case FLOAT:
                    return Float.BYTES;
                case DOUBLE:
                    return Double.BYTES;
                case INT:
                default:
                    return Integer.BYTES;
            }
        }
        return pinotConfig.getEstimatedSizeInBytesForNonNumericColumn();
    }

    private void checkColumnType(int columnIndex, Type[] expectedTypes)
    {
        checkArgument(columnIndex < split.getExpectedColumnHandles().size(), "Invalid field index");
        Type actual = split.getExpectedColumnHandles().get(columnIndex).getDataType();
        boolean matches = false;
        for (Type expectedType : expectedTypes) {
            if (actual.equals(expectedType)) {
                matches = true;
            }
        }
        checkArgument(matches, "Expected column %s to be type %s but is %s", columnIndex,
                Arrays.toString(expectedTypes), actual);
    }

    protected static Type getTypeForBlock(PinotColumnHandle pinotColumnHandle)
    {
        if (pinotColumnHandle.getDataType().equals(INTEGER)) {
            return BIGINT;
        }
        return pinotColumnHandle.getDataType();
    }

    protected static class PinotDataTableWithSize
    {
        DataTable dataTable;
        int estimatedSizeInBytes;

        PinotDataTableWithSize(DataTable dataTable, int estimatedSizeInBytes)
        {
            this.dataTable = dataTable;
            this.estimatedSizeInBytes = estimatedSizeInBytes;
        }

        DataTable getDataTable()
        {
            return dataTable;
        }

        int getEstimatedSizeInBytes()
        {
            return estimatedSizeInBytes;
        }
    }
}