HiveFileSystemTestUtils.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.AbstractTestHiveClient.HiveTransaction;
import com.facebook.presto.hive.AbstractTestHiveClient.Transaction;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits;
import static com.facebook.presto.hive.AbstractTestHiveFileSystem.SPLIT_SCHEDULING_CONTEXT;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE;
import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;

public class HiveFileSystemTestUtils
{
    private HiveFileSystemTestUtils() {}

    public static MaterializedResult readTable(SchemaTableName tableName,
            HiveTransactionManager transactionManager,
            HiveClientConfig config,
            HiveMetadataFactory metadataFactory,
            ConnectorPageSourceProvider pageSourceProvider,
            ConnectorSplitManager splitManager)
            throws IOException
    {
        ConnectorMetadata metadata = null;
        ConnectorSession session = null;
        ConnectorSplitSource splitSource = null;

        try (Transaction transaction = newTransaction(transactionManager, metadataFactory.get())) {
            metadata = transaction.getMetadata();
            session = newSession(config);

            ConnectorTableHandle table = getTableHandle(metadata, tableName, session);
            List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values());
            List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty());
            HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle();
            TableHandle tableHandle = new TableHandle(new ConnectorId(tableName.getSchemaName()), table, transaction.getTransactionHandle(), Optional.of(layoutHandle));

            metadata.beginQuery(session);

            splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle.getLayout().get(), SPLIT_SCHEDULING_CONTEXT);

            List<Type> allTypes = getTypes(columnHandles);
            List<Type> dataTypes = getTypes(columnHandles.stream()
                    .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
                    .collect(toImmutableList()));
            MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes);

            List<ConnectorSplit> splits = getAllSplits(splitSource);
            for (ConnectorSplit split : splits) {
                try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(
                        transaction.getTransactionHandle(),
                        session,
                        split,
                        tableHandle.getLayout().get(),
                        columnHandles,
                        NON_CACHEABLE,
                        new RuntimeStats())) {
                    MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes);
                    for (MaterializedRow row : pageSourceResult.getMaterializedRows()) {
                        Object[] dataValues = IntStream.range(0, row.getFieldCount())
                                .filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden())
                                .mapToObj(row::getField)
                                .toArray();
                        result.row(dataValues);
                    }
                }
            }
            return result.build();
        }
        finally {
            cleanUpQuery(metadata, session);
            closeQuietly(splitSource);
        }
    }

    public static MaterializedResult filterTable(SchemaTableName tableName,
            List<ColumnHandle> projectedColumns,
            HiveTransactionManager transactionManager,
            HiveClientConfig config,
            HiveMetadataFactory metadataFactory,
            ConnectorPageSourceProvider pageSourceProvider,
            ConnectorSplitManager splitManager)
            throws IOException
    {
        ConnectorMetadata metadata = null;
        ConnectorSession session = null;
        ConnectorSplitSource splitSource = null;

        try (Transaction transaction = newTransaction(transactionManager, metadataFactory.get())) {
            metadata = transaction.getMetadata();
            session = newSession(config);

            ConnectorTableHandle table = getTableHandle(metadata, tableName, session);
            List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty());
            HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle();
            TableHandle tableHandle = new TableHandle(new ConnectorId(tableName.getSchemaName()), table, transaction.getTransactionHandle(), Optional.of(layoutHandle));

            metadata.beginQuery(session);
            splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle.getLayout().get(), SPLIT_SCHEDULING_CONTEXT);

            List<Type> allTypes = getTypes(projectedColumns);
            List<Type> dataTypes = getTypes(projectedColumns.stream()
                    .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
                    .collect(toImmutableList()));
            MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes);

            List<ConnectorSplit> splits = getAllSplits(splitSource);
            for (ConnectorSplit split : splits) {
                try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(
                        transaction.getTransactionHandle(),
                        session,
                        split,
                        tableHandle.getLayout().get(),
                        projectedColumns,
                        NON_CACHEABLE,
                        new RuntimeStats())) {
                    MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes);
                    for (MaterializedRow row : pageSourceResult.getMaterializedRows()) {
                        Object[] dataValues = IntStream.range(0, row.getFieldCount())
                                .filter(channel -> !((HiveColumnHandle) projectedColumns.get(channel)).isHidden())
                                .mapToObj(row::getField)
                                .toArray();
                        result.row(dataValues);
                    }
                }
            }
            return result.build();
        }
        finally {
            cleanUpQuery(metadata, session);
            closeQuietly(splitSource);
        }
    }

    public static int getSplitsCount(SchemaTableName tableName,
            HiveTransactionManager transactionManager,
            HiveClientConfig config,
            HiveMetadataFactory metadataFactory,
            ConnectorSplitManager splitManager)
    {
        ConnectorMetadata metadata = null;
        ConnectorSession session = null;
        ConnectorSplitSource splitSource = null;

        try (Transaction transaction = newTransaction(transactionManager, metadataFactory.get())) {
            metadata = transaction.getMetadata();
            session = newSession(config);

            ConnectorTableHandle table = getTableHandle(metadata, tableName, session);
            List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty());
            HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle();
            TableHandle tableHandle = new TableHandle(new ConnectorId(tableName.getSchemaName()), table, transaction.getTransactionHandle(), Optional.of(layoutHandle));

            metadata.beginQuery(session);
            splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle.getLayout().get(), SPLIT_SCHEDULING_CONTEXT);

            return getAllSplits(splitSource).size();
        }
        finally {
            cleanUpQuery(metadata, session);
            closeQuietly(splitSource);
        }
    }

    public static Transaction newTransaction(HiveTransactionManager transactionManager, HiveMetadata hiveMetadata)
    {
        return new HiveTransaction(transactionManager, hiveMetadata);
    }

    public static ConnectorSession newSession(HiveClientConfig config)
    {
        return new TestingConnectorSession(getAllSessionProperties(config, new HiveCommonClientConfig()));
    }

    public static ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName, ConnectorSession session)
    {
        ConnectorTableHandle handle = metadata.getTableHandle(session, tableName);
        checkArgument(handle != null, "table not found: %s", tableName);
        return handle;
    }

    private static void closeQuietly(Closeable closeable)
    {
        try {
            if (closeable != null) {
                closeable.close();
            }
        }
        catch (IOException ignored) {
        }
    }

    private static void cleanUpQuery(ConnectorMetadata metadata, ConnectorSession session)
    {
        if (metadata != null && session != null) {
            metadata.cleanupQuery(session);
        }
    }
}