StreamPropertyDerivations.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.optimizations;
import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableLayout;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.IndexSourceNode;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.MarkDistinctNode;
import com.facebook.presto.spi.plan.MergeJoinNode;
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SemiJoinNode;
import com.facebook.presto.spi.plan.SortNode;
import com.facebook.presto.spi.plan.SpatialJoinNode;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.UnnestNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.spi.plan.WindowNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution;
import com.facebook.presto.sql.planner.plan.ApplyNode;
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode;
import com.facebook.presto.sql.planner.plan.GroupIdNode;
import com.facebook.presto.sql.planner.plan.IndexJoinNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.LateralJoinNode;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableWriterMergeNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.facebook.presto.sql.planner.plan.UpdateNode;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static com.facebook.presto.sql.planner.optimizations.PropertyDerivations.extractFixedValuesToConstantExpressions;
import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.FIXED;
import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.MULTIPLE;
import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.SINGLE;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
public final class StreamPropertyDerivations
{
private StreamPropertyDerivations() {}
public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session, boolean nativeExecution)
{
List<StreamProperties> inputProperties = node.getSources().stream()
.map(source -> derivePropertiesRecursively(source, metadata, session, nativeExecution))
.collect(toImmutableList());
return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session, nativeExecution);
}
public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session, boolean nativeExecution)
{
return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session, nativeExecution);
}
public static StreamProperties deriveProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session, boolean nativeExecution)
{
requireNonNull(node, "node is null");
requireNonNull(inputProperties, "inputProperties is null");
requireNonNull(metadata, "metadata is null");
requireNonNull(session, "session is null");
// properties.otherActualProperties will never be null here because the only way
// an external caller should obtain StreamProperties is from this method, and the
// last line of this method assures otherActualProperties is set.
ActualProperties otherProperties = PropertyDerivations.streamBackdoorDeriveProperties(
node,
inputProperties.stream()
.map(properties -> properties.otherActualProperties)
.collect(toImmutableList()),
metadata,
session);
StreamProperties result = node.accept(new Visitor(metadata, session, nativeExecution), inputProperties)
.withOtherActualProperties(otherProperties);
result.getPartitioningColumns().ifPresent(columns ->
verify(node.getOutputVariables().containsAll(columns), "Stream-level partitioning properties contain columns not present in node's output"));
Set<VariableReferenceExpression> localPropertyColumns = result.getLocalProperties().stream()
.flatMap(property -> property.getColumns().stream())
.collect(Collectors.toSet());
verify(node.getOutputVariables().containsAll(localPropertyColumns), "Stream-level local properties contain columns not present in node's output");
return result;
}
private static class Visitor
extends InternalPlanVisitor<StreamProperties, List<StreamProperties>>
{
private final Metadata metadata;
private final Session session;
private final boolean nativeExecution;
private Visitor(Metadata metadata, Session session, boolean nativeExecution)
{
this.metadata = metadata;
this.session = session;
this.nativeExecution = nativeExecution;
}
@Override
public StreamProperties visitPlan(PlanNode node, List<StreamProperties> inputProperties)
{
throw new UnsupportedOperationException("not yet implemented: " + node.getClass().getName());
}
//
// Joins
//
@Override
public StreamProperties visitJoin(JoinNode node, List<StreamProperties> inputProperties)
{
StreamProperties leftProperties = inputProperties.get(0);
List<VariableReferenceExpression> outputs = node.getOutputVariables();
boolean unordered = PropertyDerivations.spillPossible(session, node.getType());
switch (node.getType()) {
case INNER:
return leftProperties
.translate(column -> PropertyDerivations.filterOrRewrite(outputs, node.getCriteria(), column))
.unordered(unordered);
case LEFT:
return leftProperties
.translate(column -> PropertyDerivations.filterIfMissing(outputs, column))
.unordered(unordered);
case RIGHT:
// since this is a right join, none of the matched output rows will contain nulls
// in the left partitioning columns, and all of the unmatched rows will have
// null for all left columns. therefore, the output is still partitioned on the
// left columns. the only change is there will be at least two streams so the
// output is multiple
// There is one exception to this. If the left is partitioned on empty set, we
// we can't say that the output is partitioned on empty set, but we can say that
// it is partitioned on the left join symbols
// todo do something smarter after https://github.com/prestodb/presto/pull/5877 is merged
return new StreamProperties(MULTIPLE, Optional.empty(), false);
case FULL:
// the left can contain nulls in any stream so we can't say anything about the
// partitioning, and nulls from the right are produced from a extra new stream
// so we will always have multiple streams.
return new StreamProperties(MULTIPLE, Optional.empty(), false);
default:
throw new UnsupportedOperationException("Unsupported join type: " + node.getType());
}
}
@Override
public StreamProperties visitSpatialJoin(SpatialJoinNode node, List<StreamProperties> inputProperties)
{
StreamProperties leftProperties = inputProperties.get(0);
switch (node.getType()) {
case INNER:
case LEFT:
return leftProperties.translate(column -> PropertyDerivations.filterIfMissing(node.getOutputVariables(), column));
default:
throw new IllegalArgumentException("Unsupported spatial join type: " + node.getType());
}
}
@Override
public StreamProperties visitIndexJoin(IndexJoinNode node, List<StreamProperties> inputProperties)
{
StreamProperties probeProperties = inputProperties.get(0);
switch (node.getType()) {
case INNER:
return probeProperties;
case SOURCE_OUTER:
// the probe can contain nulls in any stream so we can't say anything about the
// partitioning but the other properties of the probe will be maintained.
return probeProperties.withUnspecifiedPartitioning();
default:
throw new UnsupportedOperationException("Unsupported join type: " + node.getType());
}
}
@Override
public StreamProperties visitMergeJoin(MergeJoinNode node, List<StreamProperties> inputProperties)
{
StreamProperties leftProperties = inputProperties.get(0);
StreamProperties rightProperties = inputProperties.get(1);
List<VariableReferenceExpression> outputs = node.getOutputVariables();
switch (node.getType()) {
case INNER:
return leftProperties
.translate(column -> PropertyDerivations.filterOrRewrite(outputs, node.getCriteria(), column));
case LEFT:
return leftProperties
.translate(column -> PropertyDerivations.filterIfMissing(outputs, column));
case RIGHT:
return rightProperties
.translate(column -> PropertyDerivations.filterIfMissing(outputs, column));
case FULL:
// the left can contain nulls in any stream so we can't say anything about the
// partitioning, and nulls from the right are produced from a extra new stream
// so we will always have multiple streams.
return new StreamProperties(MULTIPLE, Optional.empty(), false);
default:
throw new UnsupportedOperationException("Unsupported join type: " + node.getType());
}
}
//
// Source nodes
//
@Override
public StreamProperties visitSequence(SequenceNode node, List<StreamProperties> inputProperties)
{
return new StreamProperties(MULTIPLE, Optional.empty(), false);
}
@Override
public StreamProperties visitValues(ValuesNode node, List<StreamProperties> context)
{
// values always produces a single stream
return StreamProperties.singleStream();
}
@Override
public StreamProperties visitTableScan(TableScanNode node, List<StreamProperties> inputProperties)
{
TableLayout layout = metadata.getLayout(session, node.getTable());
Map<ColumnHandle, VariableReferenceExpression> assignments = ImmutableBiMap.copyOf(node.getAssignments()).inverse();
// Globally constant assignments
Set<ColumnHandle> constants = new HashSet<>();
extractFixedValuesToConstantExpressions(node.getCurrentConstraint()).orElse(ImmutableMap.of())
.entrySet().stream()
.filter(entry -> !entry.getValue().isNull()) // TODO consider allowing nulls
.forEach(entry -> constants.add(entry.getKey()));
Optional<Set<VariableReferenceExpression>> streamPartitionSymbols = layout.getStreamPartitioningColumns()
.flatMap(columns -> getNonConstantVariables(columns, assignments, constants));
// Native execution creates a fixed number of drivers for TableScan pipelines
StreamDistribution streamDistribution = nativeExecution ? FIXED : MULTIPLE;
// if we are partitioned on empty set, we must say multiple of unknown partitioning, because
// the connector does not guarantee a single split in this case (since it might not understand
// that the value is a constant).
if (streamPartitionSymbols.isPresent() && streamPartitionSymbols.get().isEmpty()) {
return new StreamProperties(streamDistribution, Optional.empty(), false);
}
return new StreamProperties(streamDistribution, streamPartitionSymbols, false);
}
private Optional<Set<VariableReferenceExpression>> getNonConstantVariables(Set<ColumnHandle> columnHandles, Map<ColumnHandle, VariableReferenceExpression> assignments, Set<ColumnHandle> globalConstants)
{
// Strip off the constants from the partitioning columns (since those are not required for translation)
Set<ColumnHandle> constantsStrippedPartitionColumns = columnHandles.stream()
.filter(column -> !globalConstants.contains(column))
.collect(toImmutableSet());
ImmutableSet.Builder<VariableReferenceExpression> builder = ImmutableSet.builder();
for (ColumnHandle column : constantsStrippedPartitionColumns) {
VariableReferenceExpression translated = assignments.get(column);
if (translated == null) {
return Optional.empty();
}
builder.add(translated);
}
return Optional.of(builder.build());
}
@Override
public StreamProperties visitExchange(ExchangeNode node, List<StreamProperties> inputProperties)
{
if (node.isEnsureSourceOrdering() || node.getOrderingScheme().isPresent()) {
return StreamProperties.ordered();
}
if (node.getScope() == REMOTE_MATERIALIZED) {
// remote materialized exchanges get converted to table scans. Return the properties that the table scan would have
return new StreamProperties(MULTIPLE, Optional.empty(), false);
}
if (node.getScope().isRemote()) {
// TODO: correctly determine if stream is parallelised
// based on session properties
return StreamProperties.fixedStreams();
}
switch (node.getType()) {
case GATHER:
return StreamProperties.singleStream();
case REPARTITION:
if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) ||
// no strict partitioning guarantees when multiple writers per partitions are allows (scaled writers)
node.getPartitioningScheme().isScaleWriters()) {
return new StreamProperties(FIXED, Optional.empty(), false);
}
checkArgument(
node.getPartitioningScheme().getPartitioning().getArguments().stream().allMatch(VariableReferenceExpression.class::isInstance),
format("Expect all partitioning arguments to be VariableReferenceExpression, but get %s", node.getPartitioningScheme().getPartitioning().getArguments()));
return new StreamProperties(
FIXED,
Optional.of(node.getPartitioningScheme().getPartitioning().getArguments().stream()
.map(VariableReferenceExpression.class::cast)
.collect(toImmutableList())), false);
case REPLICATE:
return new StreamProperties(MULTIPLE, Optional.empty(), false);
}
throw new UnsupportedOperationException("not yet implemented");
}
//
// Nodes that rewrite and/or drop symbols
//
@Override
public StreamProperties visitProject(ProjectNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// We can describe properties in terms of inputs that are projected unmodified (i.e., identity projections)
Map<VariableReferenceExpression, VariableReferenceExpression> identities = computeIdentityTranslations(node.getAssignments().getMap());
return properties.translate(column -> Optional.ofNullable(identities.get(column)));
}
private static Map<VariableReferenceExpression, VariableReferenceExpression> computeIdentityTranslations(Map<VariableReferenceExpression, RowExpression> assignments)
{
Map<VariableReferenceExpression, VariableReferenceExpression> inputToOutput = new HashMap<>();
for (Map.Entry<VariableReferenceExpression, RowExpression> assignment : assignments.entrySet()) {
RowExpression expression = assignment.getValue();
if (expression instanceof VariableReferenceExpression) {
inputToOutput.put((VariableReferenceExpression) expression, assignment.getKey());
}
}
return inputToOutput;
}
@Override
public StreamProperties visitGroupId(GroupIdNode node, List<StreamProperties> inputProperties)
{
Map<VariableReferenceExpression, VariableReferenceExpression> inputToOutputMappings = new HashMap<>();
for (Map.Entry<VariableReferenceExpression, VariableReferenceExpression> setMapping : node.getGroupingColumns().entrySet()) {
if (node.getCommonGroupingColumns().contains(setMapping.getKey())) {
// TODO: Add support for translating a property on a single column to multiple columns
// when GroupIdNode is copying a single input grouping column into multiple output grouping columns (i.e. aliases), this is basically picking one arbitrarily
inputToOutputMappings.putIfAbsent(setMapping.getValue(), setMapping.getKey());
}
}
// TODO: Add support for translating a property on a single column to multiple columns
// this is deliberately placed after the grouping columns, because preserving properties has a bigger perf impact
for (VariableReferenceExpression argument : node.getAggregationArguments()) {
inputToOutputMappings.putIfAbsent(argument, argument);
}
return Iterables.getOnlyElement(inputProperties).translate(column -> Optional.ofNullable(inputToOutputMappings.get(column)));
}
@Override
public StreamProperties visitAggregation(AggregationNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// Only grouped symbols projected symbols are passed through
return properties.translate(variable -> node.getGroupingKeys().contains(variable) ? Optional.of(variable) : Optional.empty());
}
@Override
public StreamProperties visitStatisticsWriterNode(StatisticsWriterNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// analyze finish only outputs row count
return properties.withUnspecifiedPartitioning();
}
@Override
public StreamProperties visitTableFinish(TableFinishNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// table finish only outputs the row count
return properties.withUnspecifiedPartitioning();
}
@Override
public StreamProperties visitDelete(DeleteNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// delete only outputs the row count
return properties.withUnspecifiedPartitioning();
}
@Override
public StreamProperties visitTableWriter(TableWriterNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// table writer only outputs the row count
return properties.withUnspecifiedPartitioning();
}
@Override
public StreamProperties visitUpdate(UpdateNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
return properties.withUnspecifiedPartitioning();
}
@Override
public StreamProperties visitTableWriteMerge(TableWriterMergeNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitUnnest(UnnestNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// We can describe properties in terms of inputs that are projected unmodified (i.e., not the unnested symbols)
Set<VariableReferenceExpression> passThroughInputs = ImmutableSet.copyOf(node.getReplicateVariables());
return properties.translate(column -> {
if (passThroughInputs.contains(column)) {
return Optional.of(column);
}
return Optional.empty();
});
}
@Override
public StreamProperties visitExplainAnalyze(ExplainAnalyzeNode node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
// explain only outputs the plan string
return properties.withUnspecifiedPartitioning();
}
//
// Nodes that gather data into a single stream
//
@Override
public StreamProperties visitIndexSource(IndexSourceNode node, List<StreamProperties> context)
{
return StreamProperties.singleStream();
}
@Override
public StreamProperties visitUnion(UnionNode node, List<StreamProperties> context)
{
// union is implemented using a local gather exchange
return StreamProperties.singleStream();
}
@Override
public StreamProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<StreamProperties> context)
{
return StreamProperties.singleStream();
}
@Override
public StreamProperties visitAssignUniqueId(AssignUniqueId node, List<StreamProperties> inputProperties)
{
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
if (properties.getPartitioningColumns().isPresent()) {
// preserve input (possibly preferred) partitioning
return properties;
}
return new StreamProperties(properties.getDistribution(),
Optional.of(ImmutableList.of(node.getIdVariable())),
properties.isOrdered());
}
//
// Simple nodes that pass through stream properties
//
@Override
public StreamProperties visitOutput(OutputNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties)
.translate(column -> PropertyDerivations.filterIfMissing(node.getOutputVariables(), column));
}
@Override
public StreamProperties visitMarkDistinct(MarkDistinctNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitWindow(WindowNode node, List<StreamProperties> inputProperties)
{
StreamProperties childProperties = Iterables.getOnlyElement(inputProperties);
if (childProperties.isSingleStream() && node.getPartitionBy().isEmpty() && node.getOrderingScheme().isPresent()) {
return StreamProperties.ordered();
}
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitRowNumber(RowNumberNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitTopNRowNumber(TopNRowNumberNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitTopN(TopNNode node, List<StreamProperties> inputProperties)
{
// Partial TopN doesn't guarantee that stream is ordered
if (node.getStep().equals(TopNNode.Step.PARTIAL)) {
return Iterables.getOnlyElement(inputProperties);
}
return StreamProperties.ordered();
}
@Override
public StreamProperties visitSort(SortNode node, List<StreamProperties> inputProperties)
{
StreamProperties sourceProperties = Iterables.getOnlyElement(inputProperties);
if (sourceProperties.isSingleStream()) {
// stream is only sorted if sort operator is executed without parallelism
return StreamProperties.ordered();
}
return sourceProperties;
}
@Override
public StreamProperties visitLimit(LimitNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitDistinctLimit(DistinctLimitNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitSemiJoin(SemiJoinNode node, List<StreamProperties> inputProperties)
{
return inputProperties.get(0);
}
@Override
public StreamProperties visitApply(ApplyNode node, List<StreamProperties> inputProperties)
{
throw new IllegalStateException("Unexpected node: " + node.getClass());
}
@Override
public StreamProperties visitLateralJoin(LateralJoinNode node, List<StreamProperties> inputProperties)
{
throw new IllegalStateException("Unexpected node: " + node.getClass());
}
@Override
public StreamProperties visitFilter(FilterNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitSample(SampleNode node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public StreamProperties visitRemoteSource(RemoteSourceNode node, List<StreamProperties> inputProperties)
{
if (node.getOrderingScheme().isPresent()) {
return StreamProperties.ordered();
}
if (node.isEnsureSourceOrdering()) {
return StreamProperties.singleStream();
}
return StreamProperties.fixedStreams();
}
}
public static final class StreamProperties
{
public enum StreamDistribution
{
SINGLE, MULTIPLE, FIXED
}
private final StreamDistribution distribution;
private final Optional<List<VariableReferenceExpression>> partitioningColumns; // if missing => partitioned with some unknown scheme
private final boolean ordered;
// We are only interested in the local properties, but PropertyDerivations requires input
// ActualProperties, so we hold on to the whole object
private final ActualProperties otherActualProperties;
// NOTE: Partitioning on zero columns (or effectively zero columns if the columns are constant) indicates that all
// the rows will be partitioned into a single stream.
private StreamProperties(StreamDistribution distribution, Optional<? extends Iterable<VariableReferenceExpression>> partitioningColumns, boolean ordered)
{
this(distribution, partitioningColumns, ordered, null);
}
private StreamProperties(
StreamDistribution distribution,
Optional<? extends Iterable<VariableReferenceExpression>> partitioningColumns,
boolean ordered,
ActualProperties otherActualProperties)
{
this.distribution = requireNonNull(distribution, "distribution is null");
this.partitioningColumns = requireNonNull(partitioningColumns, "partitioningProperties is null")
.map(ImmutableList::copyOf);
checkArgument(distribution != SINGLE || this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
"Single stream must be partitioned on empty set");
checkArgument(distribution == SINGLE || !this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
"Multiple streams must not be partitioned on empty set");
this.ordered = ordered;
checkArgument(!ordered || distribution == SINGLE, "Ordered must be a single stream");
this.otherActualProperties = otherActualProperties;
}
public List<LocalProperty<VariableReferenceExpression>> getLocalProperties()
{
checkState(otherActualProperties != null, "otherActualProperties not set");
return otherActualProperties.getLocalProperties();
}
private static StreamProperties singleStream()
{
return new StreamProperties(SINGLE, Optional.of(ImmutableSet.of()), false);
}
private static StreamProperties fixedStreams()
{
return new StreamProperties(FIXED, Optional.empty(), false);
}
private static StreamProperties ordered()
{
return new StreamProperties(SINGLE, Optional.of(ImmutableSet.of()), true);
}
private StreamProperties unordered(boolean unordered)
{
if (unordered) {
ActualProperties updatedProperties = null;
if (otherActualProperties != null) {
updatedProperties = ActualProperties.builderFrom(otherActualProperties)
.unordered(true)
.build();
}
return new StreamProperties(
distribution,
partitioningColumns,
false,
updatedProperties);
}
return this;
}
public boolean isSingleStream()
{
return distribution == SINGLE;
}
public StreamDistribution getDistribution()
{
return distribution;
}
public boolean isExactlyPartitionedOn(Iterable<VariableReferenceExpression> columns)
{
return partitioningColumns.isPresent() && columns.equals(ImmutableList.copyOf(partitioningColumns.get()));
}
public boolean isPartitionedOn(Iterable<VariableReferenceExpression> columns)
{
if (!partitioningColumns.isPresent()) {
return false;
}
// partitioned on (k_1, k_2, ..., k_n) => partitioned on (k_1, k_2, ..., k_n, k_n+1, ...)
// can safely ignore all constant columns when comparing partition properties
return ImmutableSet.copyOf(columns).containsAll(partitioningColumns.get());
}
public boolean isOrdered()
{
return ordered;
}
private StreamProperties withUnspecifiedPartitioning()
{
// a single stream has no symbols
if (isSingleStream()) {
return this;
}
// otherwise we are distributed on some symbols, but since we are trying to remove all symbols,
// just say we have multiple partitions with an unknown scheme
return new StreamProperties(distribution, Optional.empty(), ordered);
}
private StreamProperties withOtherActualProperties(ActualProperties actualProperties)
{
return new StreamProperties(distribution, partitioningColumns, ordered, actualProperties);
}
public StreamProperties translate(Function<VariableReferenceExpression, Optional<VariableReferenceExpression>> translator)
{
return new StreamProperties(
distribution,
partitioningColumns.flatMap(partitioning -> {
ImmutableList.Builder<VariableReferenceExpression> newPartitioningColumns = ImmutableList.builder();
for (VariableReferenceExpression partitioningColumn : partitioning) {
Optional<VariableReferenceExpression> translated = translator.apply(partitioningColumn);
if (!translated.isPresent()) {
return Optional.empty();
}
newPartitioningColumns.add(translated.get());
}
return Optional.of(newPartitioningColumns.build());
}),
ordered, otherActualProperties.translateVariable(translator));
}
public Optional<List<VariableReferenceExpression>> getPartitioningColumns()
{
return partitioningColumns;
}
@Override
public int hashCode()
{
return Objects.hash(distribution, partitioningColumns);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
StreamProperties other = (StreamProperties) obj;
return Objects.equals(this.distribution, other.distribution) &&
Objects.equals(this.partitioningColumns, other.partitioningColumns);
}
@Override
public String toString()
{
return toStringHelper(this)
.add("distribution", distribution)
.add("partitioningColumns", partitioningColumns)
.toString();
}
}
}