BigQueryMetadata.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.plugin.bigquery;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.NotFoundException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.plugin.bigquery.BigQueryErrorCode.BIGQUERY_TABLE_DISAPPEAR_DURING_LIST;
import static com.facebook.presto.plugin.bigquery.Conversions.toColumnMetadata;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

public class BigQueryMetadata
        implements ConnectorMetadata
{
    static final int NUMERIC_DATA_TYPE_PRECISION = 38;
    static final int NUMERIC_DATA_TYPE_SCALE = 9;
    private static final String INFORMATION_SCHEMA = "information_schema";
    private static final Logger log = Logger.get(BigQueryMetadata.class);
    private final BigQueryClient bigQueryClient;
    private final String projectId;

    @Inject
    public BigQueryMetadata(BigQueryClient bigQueryClient, BigQueryConfig config)
    {
        this.bigQueryClient = bigQueryClient;
        this.projectId = config.getProjectId().orElse(bigQueryClient.getProjectId());
    }

    @Override
    public List<String> listSchemaNames(ConnectorSession session)
    {
        return Streams.stream(bigQueryClient.listDatasets(projectId))
                .map(dataset -> dataset.getDatasetId().getDataset())
                .filter(schemaName -> !schemaName.equalsIgnoreCase(INFORMATION_SCHEMA))
                .collect(toImmutableList());
    }

    @Override
    public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
    {
        log.debug("listTables(session=%s, schemaName=%s)", session, schemaName);
        return listTablesWithTypes(session, schemaName, TABLE);
    }

    @Override
    public List<SchemaTableName> listViews(ConnectorSession session, Optional<String> schemaName)
    {
        log.debug("listViews(session=%s, schemaName=%s)", session, schemaName);
        return listTablesWithTypes(session, schemaName, VIEW);
    }

    private List<SchemaTableName> listTablesWithTypes(ConnectorSession session, Optional<String> schemaName, TableDefinition.Type... types)
    {
        if (schemaName.isPresent() && schemaName.get().equalsIgnoreCase(INFORMATION_SCHEMA)) {
            return ImmutableList.of();
        }
        Set<String> schemaNames = schemaName.map(ImmutableSet::of)
                .orElseGet(() -> ImmutableSet.copyOf(listSchemaNames(session)));

        ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder();
        for (String datasetId : schemaNames) {
            for (Table table : bigQueryClient.listTables(DatasetId.of(projectId, datasetId), types)) {
                tableNames.add(new SchemaTableName(datasetId, table.getTableId().getTable()));
            }
        }
        return tableNames.build();
    }

    @Override
    public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
    {
        log.debug("getTableHandle(session=%s, tableName=%s)", session, tableName);
        Optional<TableInfo> tableInfo = getBigQueryTable(tableName);
        if (!tableInfo.isPresent()) {
            log.debug("Table [%s.%s] was not found", tableName.getSchemaName(), tableName.getTableName());
            return null;
        }
        return BigQueryTableHandle.from(tableInfo.get());
    }

    @Override
    public List<ConnectorTableLayoutResult> getTableLayouts(
            ConnectorSession session,
            ConnectorTableHandle table,
            Constraint<ColumnHandle> constraint,
            Optional<Set<ColumnHandle>> desiredColumns)
    {
        log.debug("getTableMetadata(session=%s, table=%s, constraint=%s, desiredColumns=%s)", session, table, constraint, desiredColumns);
        BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) table;
        if (desiredColumns.isPresent()) {
            bigQueryTableHandle = bigQueryTableHandle.withProjectedColumns(ImmutableList.copyOf(desiredColumns.get()));
        }
        BigQueryTableLayoutHandle bigQueryTableLayoutHandle = new BigQueryTableLayoutHandle(bigQueryTableHandle);
        return ImmutableList.of(new ConnectorTableLayoutResult(new ConnectorTableLayout(bigQueryTableLayoutHandle), constraint.getSummary()));
    }

    @Override
    public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle layoutHandle)
    {
        log.debug("getTableMetadata(session=%s, layoutHandle=%s)", session, layoutHandle);
        BigQueryTableLayoutHandle bigQueryTableLayoutHandle = (BigQueryTableLayoutHandle) layoutHandle;
        return new ConnectorTableLayout(
                bigQueryTableLayoutHandle,
                Optional.empty(), // columns of the table, not projected
                bigQueryTableLayoutHandle.getTupleDomain(), // predicate
                Optional.empty(), // tablePartitioning
                Optional.empty(), // streamPartitioningColumns
                Optional.empty(), // discretePredicates
                ImmutableList.of()); // localProperties
    }

    private Optional<TableInfo> getBigQueryTable(SchemaTableName tableName)
    {
        TableInfo tableInfo = bigQueryClient.getTable(TableId.of(projectId, tableName.getSchemaName(), tableName.getTableName()));
        return Optional.ofNullable(tableInfo);
    }

    public ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName schemaTableName)
    {
        ConnectorTableHandle table = getTableHandle(session, schemaTableName);
        if (table == null) {
            throw new TableNotFoundException(schemaTableName);
        }
        return getTableMetadata(session, table);
    }

    @Override
    public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
    {
        log.debug("getTableMetadata(session=%s, tableHandle=%s)", session, tableHandle);
        TableInfo table = bigQueryClient.getTable(((BigQueryTableHandle) tableHandle).getTableId());
        SchemaTableName schemaTableName = new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable());
        Schema schema = table.getDefinition().getSchema();
        List<ColumnMetadata> columns = schema == null ?
                ImmutableList.of() :
                schema.getFields().stream()
                        .map(field -> toColumnMetadata(field, normalizeIdentifier(session, field.getName())))
                        .collect(toImmutableList());
        return new ConnectorTableMetadata(schemaTableName, columns);
    }

    @Override
    public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
    {
        log.debug("getColumnHandles(session=%s, tableHandle=%s)", session, tableHandle);
        TableInfo table = bigQueryClient.getTable(((BigQueryTableHandle) tableHandle).getTableId());
        Schema schema = table.getDefinition().getSchema();
        return schema == null ?
                ImmutableMap.of() :
                schema.getFields().stream().collect(toMap(Field::getName, Conversions::toColumnHandle));
    }

    @Override
    public ColumnMetadata getColumnMetadata(
            ConnectorSession session,
            ConnectorTableHandle tableHandle,
            ColumnHandle columnHandle)
    {
        log.debug("getColumnMetadata(session=%s, tableHandle=%s, columnHandle=%s)", session, columnHandle, columnHandle);
        return ((BigQueryColumnHandle) columnHandle).getColumnMetadata();
    }

    @Override
    public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
    {
        log.debug("listTableColumns(session=%s, prefix=%s)", session, prefix);
        requireNonNull(prefix, "prefix is null");
        ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
        for (SchemaTableName tableName : listTables(session, prefix)) {
            try {
                columns.put(tableName, getTableMetadata(session, tableName).getColumns());
            }
            catch (NotFoundException ex) {
                throw new BigQueryException(BIGQUERY_TABLE_DISAPPEAR_DURING_LIST, "Table disappeared during listing operation", ex);
            }
        }
        return columns.build();
    }

    private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
    {
        if (prefix.getTableName() == null) {
            return listTables(session, Optional.ofNullable(prefix.getSchemaName()));
        }
        SchemaTableName tableName = prefix.toSchemaTableName();
        Optional<TableInfo> tableInfo = getBigQueryTable(tableName);
        return tableInfo.isPresent() ?
                ImmutableList.of(tableName) :
                ImmutableList.of(); // table does not exist
    }
}