PinotQueryGeneratorContext.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.pinot.query;

import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.pinot.PinotColumnHandle;
import com.facebook.presto.pinot.PinotConfig;
import com.facebook.presto.pinot.PinotException;
import com.facebook.presto.pinot.PinotQueryOptionsUtils;
import com.facebook.presto.pinot.PinotSessionProperties;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.facebook.presto.pinot.PinotErrorCode.PINOT_QUERY_GENERATOR_FAILURE;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION;
import static com.facebook.presto.pinot.PinotPushdownUtils.PINOT_DISTINCT_COUNT_FUNCTION_NAME;
import static com.facebook.presto.pinot.PinotPushdownUtils.checkSupported;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.lang.StrictMath.toIntExact;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

/**
 * Encapsulates the components needed to construct a SQL query and provides methods to update the current context with new operations.
 */
public class PinotQueryGeneratorContext
{
    public static final String TIME_BOUNDARY_FILTER_TEMPLATE = "__TIME_BOUNDARY_FILTER_TEMPLATE__";
    public static final String TABLE_NAME_SUFFIX_TEMPLATE = "__TABLE_NAME_SUFFIX_TEMPLATE__";
    // Fields defining the query
    // A map that maps the column definition in terms of input relation column(s)
    private final Map<VariableReferenceExpression, Selection> selections;
    // Outputs are the fields defined in Presto for what's expected from Pinot query.
    // Outputs/groupByColumns/topNColumnOrderingMap should be part of the keys of selections.
    private final LinkedHashSet<VariableReferenceExpression> outputs;
    private final LinkedHashSet<VariableReferenceExpression> groupByColumns;
    private final LinkedHashMap<VariableReferenceExpression, OrderingColumnInformation> topNColumnInformationMap;
    private final Set<VariableReferenceExpression> hiddenColumnSet;
    private final Set<VariableReferenceExpression> variablesInAggregation;
    private final Optional<String> from;
    private final Optional<String> filter;
    private final OptionalInt limit;
    private final int aggregations;

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("selections", selections)
                .add("outputs", outputs)
                .add("groupByColumns", groupByColumns)
                .add("topNColumnInformationMap", topNColumnInformationMap)
                .add("hiddenColumnSet", hiddenColumnSet)
                .add("variablesInAggregation", variablesInAggregation)
                .add("from", from)
                .add("filter", filter)
                .add("limit", limit)
                .add("aggregations", aggregations)
                .toString();
    }

    public PinotQueryGeneratorContext()
    {
        this(new HashMap<>(), new LinkedHashSet<>(), null);
    }

    PinotQueryGeneratorContext(
            Map<VariableReferenceExpression, Selection> selections,
            LinkedHashSet<VariableReferenceExpression> outputs,
            String from)
    {
        this(
                selections,
                outputs,
                Optional.ofNullable(from),
                Optional.empty(),
                0,
                new LinkedHashSet<>(),
                new LinkedHashMap<>(),
                OptionalInt.empty(),
                new HashSet<>(),
                new HashSet<>());
    }

    private PinotQueryGeneratorContext(
            Map<VariableReferenceExpression, Selection> selections,
            LinkedHashSet<VariableReferenceExpression> outputs,
            Optional<String> from,
            Optional<String> filter,
            int aggregations,
            LinkedHashSet<VariableReferenceExpression> groupByColumns,
            LinkedHashMap<VariableReferenceExpression, OrderingColumnInformation> topNColumnInformationMap,
            OptionalInt limit,
            Set<VariableReferenceExpression> variablesInAggregation,
            Set<VariableReferenceExpression> hiddenColumnSet)
    {
        this.selections = new HashMap<>(requireNonNull(selections, "selections can't be null"));
        this.outputs = new LinkedHashSet<>(requireNonNull(outputs, "outputs can't be null."));
        this.from = requireNonNull(from, "source can't be null");
        this.aggregations = aggregations;
        this.groupByColumns = new LinkedHashSet<>(requireNonNull(groupByColumns, "groupByColumns can't be null. It could be empty if not available"));
        this.topNColumnInformationMap = new LinkedHashMap<>(requireNonNull(topNColumnInformationMap, "topNColumnInformationMap can't be null. It could be empty if not available"));
        this.filter = requireNonNull(filter, "filter is null");
        this.limit = requireNonNull(limit, "limit is null");
        this.hiddenColumnSet = requireNonNull(hiddenColumnSet, "hidden column set is null");
        this.variablesInAggregation = requireNonNull(variablesInAggregation, "variables in aggregation is null");
    }

    /**
     * Apply the given filter to current context and return the updated context. Throws error for invalid operations.
     */
    public PinotQueryGeneratorContext withFilter(String filter)
    {
        checkSupported(!hasFilter(), "There already exists a filter. Pinot doesn't support filters at multiple levels");
        checkSupported(!hasAggregation(), "Pinot doesn't support filtering the results of aggregation");
        checkSupported(!hasLimit(), "Pinot doesn't support filtering on top of the limit");
        return new PinotQueryGeneratorContext(
                selections,
                outputs,
                from,
                Optional.of(filter),
                aggregations,
                groupByColumns,
                topNColumnInformationMap,
                limit,
                variablesInAggregation,
                hiddenColumnSet);
    }

    /**
     * Apply the aggregation to current context and return the updated context. Throws error for invalid operations.
     */
    public PinotQueryGeneratorContext withAggregation(
            Map<VariableReferenceExpression, Selection> newSelections,
            LinkedHashSet<VariableReferenceExpression> outputs,
            LinkedHashSet<VariableReferenceExpression> groupByColumns,
            int aggregations,
            Set<VariableReferenceExpression> hiddenColumnSet)
    {
        // there is only one aggregation supported unless distinct is used.
        AtomicBoolean pushDownDistinctCount = new AtomicBoolean(false);
        LinkedHashSet<VariableReferenceExpression> newOutputs = new LinkedHashSet<>(outputs);
        newSelections.values().forEach(selection -> {
            if (selection.getDefinition().startsWith(PINOT_DISTINCT_COUNT_FUNCTION_NAME.toUpperCase(Locale.ENGLISH))) {
                pushDownDistinctCount.set(true);
            }
        });
        if (pushDownDistinctCount.get()) {
            // Push down count distinct query to Pinot, clean up hidden column set by the non-aggregation groupBy Plan.
            hiddenColumnSet = ImmutableSet.of();
        }
        else {
            checkSupported(!hasAggregation(), "Pinot doesn't support aggregation on top of the aggregated data");
        }
        checkSupported(!hasLimit(), "Pinot doesn't support aggregation on top of the limit");

        return new PinotQueryGeneratorContext(newSelections, newOutputs, from, filter, aggregations, groupByColumns, topNColumnInformationMap, limit, variablesInAggregation, hiddenColumnSet);
    }

    /**
     * Apply new selections/project to current context and return the updated context. Throws error for invalid operations.
     */
    public PinotQueryGeneratorContext withProject(Map<VariableReferenceExpression, Selection> newSelections, LinkedHashSet<VariableReferenceExpression> newOutputs)
    {
        checkSupported(!newOutputs.isEmpty(), "Missing output expression in Pinot query context");
        return new PinotQueryGeneratorContext(
                newSelections,
                newOutputs,
                from,
                filter,
                aggregations,
                groupByColumns,
                topNColumnInformationMap,
                limit,
                variablesInAggregation,
                hiddenColumnSet);
    }

    private static int checkForValidLimit(long limit)
    {
        if (limit <= 0 || limit > Integer.MAX_VALUE) {
            throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Limit " + limit + " not supported: Limit is not being pushed down");
        }
        return toIntExact(limit);
    }

    /**
     * Apply limit to current context and return the updated context. Throws error for invalid operations.
     */
    public PinotQueryGeneratorContext withLimit(long limit)
    {
        int intLimit = checkForValidLimit(limit);
        checkSupported(!hasLimit(), "Limit already exists. Pinot doesn't support limit on top of another limit");
        return new PinotQueryGeneratorContext(
                selections,
                outputs,
                from,
                filter,
                aggregations,
                groupByColumns,
                topNColumnInformationMap,
                OptionalInt.of(intLimit),
                variablesInAggregation,
                hiddenColumnSet);
    }

    /**
     * Apply order by to current context and return the updated context. Throws error for invalid operations.
     */
    public PinotQueryGeneratorContext withTopN(LinkedHashMap<VariableReferenceExpression, SortOrder> orderByColumnOrderingMap, long limit)
    {
        checkSupported(!hasLimit(), "Limit already exists. Pinot doesn't support order by limit on top of another limit");
        int intLimit = checkForValidLimit(limit);

        LinkedHashMap<VariableReferenceExpression, OrderingColumnInformation> orderByColumnInformation = new LinkedHashMap<>();
        orderByColumnOrderingMap.entrySet().stream().forEach(entry -> orderByColumnInformation.put(entry.getKey(), new OrderingColumnInformation(entry.getValue(), selections.get(entry.getKey()))));

        return new PinotQueryGeneratorContext(
                selections,
                outputs,
                from,
                filter,
                aggregations,
                groupByColumns,
                orderByColumnInformation,
                OptionalInt.of(intLimit),
                variablesInAggregation,
                hiddenColumnSet);
    }

    private boolean hasFilter()
    {
        return filter.isPresent();
    }

    private boolean hasLimit()
    {
        return limit.isPresent();
    }

    public boolean hasGroupBy()
    {
        return !groupByColumns.isEmpty();
    }

    public boolean hasAggregation()
    {
        return aggregations > 0;
    }

    private boolean hasOrderBy()
    {
        return !topNColumnInformationMap.isEmpty();
    }

    public Map<VariableReferenceExpression, Selection> getSelections()
    {
        return selections;
    }

    public Set<VariableReferenceExpression> getHiddenColumnSet()
    {
        return hiddenColumnSet;
    }

    Set<VariableReferenceExpression> getVariablesInAggregation()
    {
        return variablesInAggregation;
    }

    public PinotQueryGenerator.GeneratedPinotQuery toQuery(PinotConfig pinotConfig, ConnectorSession session)
    {
        return toSqlQuery(pinotConfig, session);
    }

    // Generate Pinot query:
    // - takes arguments of expressions/table name/limit clause;
    // - handles the common logic to generate where/groupBy/orderBy clauses.
    private String generatePinotQueryHelper(boolean forBroker, String expressions, String tableName, String limitClause, String queryOptions)
    {
        String query = "SELECT " + expressions + " FROM " + tableName + (forBroker ? "" : TABLE_NAME_SUFFIX_TEMPLATE);
        if (filter.isPresent()) {
            String filterString = filter.get();
            // this is hack!!!. Ideally we want to clone the scan pipeline and create/update the filter in the scan pipeline to contain this filter and
            // at the same time add the time column to scan so that the query generator doesn't fail when it looks up the time column in scan output columns
            query += format(" WHERE %s%s", filterString, forBroker ? "" : TIME_BOUNDARY_FILTER_TEMPLATE);
        }
        else if (!forBroker) {
            query += TIME_BOUNDARY_FILTER_TEMPLATE;
        }

        if (hasGroupBy()) {
            String groupByExpr = groupByColumns.stream().map(x -> selections.get(x).getDefinition()).collect(Collectors.joining(", "));
            query = query + " GROUP BY " + groupByExpr;
        }

        if (hasOrderBy()) {
            String orderByExpressions = topNColumnInformationMap.entrySet().stream().map(entry -> (entry.getValue().getSelection().getDefinition()) + (entry.getValue().getSortOrder().isAscending() ? "" : " DESC")).collect(Collectors.joining(", "));
            query = query + " ORDER BY " + orderByExpressions;
        }
        query = query + limitClause;
        query = query + queryOptions;
        return query;
    }

    /**
     * Convert the current context to a Pinot SQL
     */
    public PinotQueryGenerator.GeneratedPinotQuery toSqlQuery(PinotConfig pinotConfig, ConnectorSession session)
    {
        int nonAggregateShortQueryLimit = PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session);
        boolean isQueryShort = (hasAggregation() || hasGroupBy()) || limit.orElse(Integer.MAX_VALUE) < nonAggregateShortQueryLimit;
        boolean attemptBrokerQueries = PinotSessionProperties.isAttemptBrokerQueries(session) || isQueryShort;
        boolean forBroker = !PinotSessionProperties.isForbidBrokerQueries(session) && attemptBrokerQueries;
        String groupByExpressions = groupByColumns.stream()
                .map(x -> selections.get(x).getDefinition())
                .collect(Collectors.joining(", "));

        checkSupported(!outputs.isEmpty(), "Unable to generate Pinot query without output expression");
        String selectExpressions = outputs.stream()
                .filter(o -> !groupByColumns.contains(o))
                .map(o -> updateSelection(selections.get(o).getDefinition(), session))
                .collect(Collectors.joining(", "));
        String expressions = (groupByExpressions.isEmpty()) ?
                selectExpressions :
                (selectExpressions.isEmpty()) ?
                        groupByExpressions :
                        groupByExpressions + ", " + selectExpressions;

        String tableName = from.orElseThrow(() -> new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Table name not encountered yet"));

        // Rules for limit:
        // - If its a selection query:
        //      + given limit or configured limit
        // - Else if has group by:
        //      + default limit or configured top limit
        // - Aggregation only query limit is ignored.
        // - Fail if limit is invalid
        int queryLimit = -1;
        if (!hasAggregation() && !hasGroupBy()) {
            if (!limit.isPresent() && forBroker && !attemptBrokerQueries) {
                throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Broker non aggregate queries have to have a limit");
            }
            else {
                queryLimit = limit.orElseGet(() -> PinotSessionProperties.getLimitLargerForSegment(session));
            }
        }
        else if (hasGroupBy()) {
            if (limit.isPresent()) {
                queryLimit = limit.getAsInt();
            }
            else {
                queryLimit = PinotSessionProperties.getTopNLarge(session);
            }
        }
        String limitClause = "";
        if (queryLimit > 0) {
            limitClause = " LIMIT " + queryLimit;
        }
        String queryOptionsProperty = PinotSessionProperties.getQueryOptions(session);
        String queryOptions = PinotQueryOptionsUtils.getQueryOptionsAsString(queryOptionsProperty);
        String query = generatePinotQueryHelper(forBroker, expressions, tableName, limitClause, queryOptions);
        LinkedHashMap<VariableReferenceExpression, PinotColumnHandle> assignments = getAssignments();
        List<Integer> indices = getIndicesMappingFromPinotSchemaToPrestoSchema(query, assignments);
        return new PinotQueryGenerator.GeneratedPinotQuery(tableName, query, indices, filter.isPresent(), forBroker);
    }

    private String updateSelection(String definition, ConnectorSession session)
    {
        final String overrideDistinctCountFunction = PinotSessionProperties.getOverrideDistinctCountFunction(session);
        if (!PINOT_DISTINCT_COUNT_FUNCTION_NAME.equalsIgnoreCase(overrideDistinctCountFunction)) {
            return definition.replaceFirst(PINOT_DISTINCT_COUNT_FUNCTION_NAME.toUpperCase() + "\\(", overrideDistinctCountFunction.toUpperCase() + "\\(");
        }
        return definition;
    }

    private List<Integer> getIndicesMappingFromPinotSchemaToPrestoSchema(String query, Map<VariableReferenceExpression, PinotColumnHandle> assignments)
    {
        LinkedHashMap<VariableReferenceExpression, Selection> expressionsInPinotOrder = new LinkedHashMap<>();
        for (VariableReferenceExpression groupByColumn : groupByColumns) {
            Selection groupByColumnDefinition = selections.get(groupByColumn);
            if (groupByColumnDefinition == null) {
                throw new IllegalStateException(format(
                        "Group By column (%s) definition not found in input selections: %s",
                        groupByColumn,
                        Joiner.on(",").withKeyValueSeparator(":").join(selections)));
            }
            expressionsInPinotOrder.put(groupByColumn, groupByColumnDefinition);
        }
        for (VariableReferenceExpression outputColumn : outputs) {
            Selection outputColumnDefinition = selections.get(outputColumn);
            if (outputColumnDefinition == null) {
                throw new IllegalStateException(format(
                        "Output column (%s) definition not found in input selections: %s",
                        outputColumn,
                        Joiner.on(",").withKeyValueSeparator(":").join(selections)));
            }
            expressionsInPinotOrder.put(outputColumn, outputColumnDefinition);
        }

        checkSupported(
                assignments.size() <= expressionsInPinotOrder.keySet().stream().filter(key -> !hiddenColumnSet.contains(key)).count(),
                "Expected returned expressions %s is a superset of selections %s",
                Joiner.on(",").withKeyValueSeparator(":").join(expressionsInPinotOrder),
                Joiner.on(",").withKeyValueSeparator("=").join(assignments));

        Map<VariableReferenceExpression, Integer> assignmentToIndex = new HashMap<>();
        Iterator<Map.Entry<VariableReferenceExpression, PinotColumnHandle>> assignmentsIterator = assignments.entrySet().iterator();
        for (int i = 0; i < assignments.size(); i++) {
            VariableReferenceExpression key = assignmentsIterator.next().getKey();
            Integer previous = assignmentToIndex.put(key, i);
            if (previous != null) {
                throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.of(query), format("Expected Pinot column handle %s to occur only once, but we have: %s", key, Joiner.on(",").withKeyValueSeparator("=").join(assignments)));
            }
        }

        ImmutableList.Builder<Integer> outputIndices = ImmutableList.builder();
        for (Map.Entry<VariableReferenceExpression, Selection> expression : expressionsInPinotOrder.entrySet()) {
            Integer index;
            if (hiddenColumnSet.contains(expression.getKey())) {
                index = -1; // negative output index means to skip this value returned by pinot at query time
            }
            else {
                index = assignmentToIndex.getOrDefault(expression.getKey(), -1); // negative output index means to skip this value returned by pinot at query time
            }
            outputIndices.add(index);
        }
        return outputIndices.build();
    }

    public LinkedHashMap<VariableReferenceExpression, PinotColumnHandle> getAssignments()
    {
        LinkedHashMap<VariableReferenceExpression, PinotColumnHandle> result = new LinkedHashMap<>();
        LinkedHashSet<VariableReferenceExpression> outputFields = new LinkedHashSet<>();
        outputFields.addAll(outputs.stream().filter(variable -> !hiddenColumnSet.contains(variable)).collect(Collectors.toList()));
        outputFields.stream().forEach(variable -> {
            Selection selection = selections.get(variable);
            PinotColumnHandle handle = selection.getOrigin() == Origin.TABLE_COLUMN ? new PinotColumnHandle(selection.getDefinition(), variable.getType(), PinotColumnHandle.PinotColumnType.REGULAR) : new PinotColumnHandle(variable, PinotColumnHandle.PinotColumnType.DERIVED);
            result.put(variable, handle);
        });
        return result;
    }

    public PinotQueryGeneratorContext withOutputColumns(List<VariableReferenceExpression> outputColumns)
    {
        LinkedHashSet<VariableReferenceExpression> newOutputs = new LinkedHashSet<>(outputColumns);
        outputColumns.forEach(o -> requireNonNull(selections.get(o), String.format("Cannot find the selection %s in the original context %s", o, this)));
        // Hidden columns flow as is from the previous
        selections.entrySet().stream().filter(e -> hiddenColumnSet.contains(e.getKey())).forEach(e -> newOutputs.add(e.getKey()));
        return new PinotQueryGeneratorContext(selections, newOutputs, from, filter, aggregations, groupByColumns, topNColumnInformationMap, limit, variablesInAggregation, hiddenColumnSet);
    }

    public PinotQueryGeneratorContext withVariablesInAggregation(Set<VariableReferenceExpression> newVariablesInAggregation)
    {
        return new PinotQueryGeneratorContext(
                selections,
                outputs,
                from,
                filter,
                aggregations,
                groupByColumns,
                topNColumnInformationMap,
                limit,
                newVariablesInAggregation,
                hiddenColumnSet);
    }

    public PinotQueryGeneratorContext withDistinctLimit(LinkedHashSet<VariableReferenceExpression> newGroupByColumns, long limit)
    {
        int intLimit = checkForValidLimit(limit);
        checkSupported(!hasLimit(), "Limit already exists. Pinot doesn't support limit on top of another limit");
        checkSupported(!hasGroupBy(), "GroupBy already exists. Pinot doesn't support Distinct on top of another Group By");
        checkSupported(!hasAggregation(), "Aggregation already exists. Pinot doesn't support Distinct Limit on top of Aggregation");
        return new PinotQueryGeneratorContext(
                selections,
                outputs,
                from,
                filter,
                aggregations,
                newGroupByColumns,
                topNColumnInformationMap,
                OptionalInt.of(intLimit),
                variablesInAggregation,
                hiddenColumnSet);
    }

    /**
     * Where is the selection/projection originated from
     */
    public enum Origin
    {
        TABLE_COLUMN, // refers to direct column in table
        DERIVED, // expression is derived from one or more input columns or a combination of input columns and literals
        LITERAL, // derived from literal
    }

    // Projected/selected column definition in query
    public static class Selection
    {
        private final String definition;
        private final Origin origin;

        public Selection(String definition, Origin origin)
        {
            this.origin = origin;
            this.definition = (origin == Origin.TABLE_COLUMN) ? (definition.startsWith("\"") ? definition : String.format("\"%s\"", definition)) : definition;
        }

        public String getDefinition()
        {
            return definition;
        }

        public Origin getOrigin()
        {
            return origin;
        }

        @Override
        public String toString()
        {
            return definition;
        }
    }

    // Columns definitions and ordering information for OrderBy Columns
    private static class OrderingColumnInformation
    {
        private final SortOrder sortOrder;
        private final Selection selection;

        public OrderingColumnInformation(SortOrder sortOrder, Selection selection)
        {
            this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
            this.selection = requireNonNull(selection, "selection is null");
        }

        public SortOrder getSortOrder()
        {
            return sortOrder;
        }

        public Selection getSelection()
        {
            return selection;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("sortOrder", sortOrder.isAscending() ? "ASC" : "DESC")
                    .add("selection", selection)
                    .toString();
        }
    }
}