TestPrestoSparkPhysicalResourceAllocationStrategy.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark.planner;
import com.facebook.presto.Session;
import com.facebook.presto.metadata.AbstractMockMetadata;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spark.PrestoSparkPhysicalResourceCalculator;
import com.facebook.presto.spark.PrestoSparkSourceStatsCollector;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.plan.JoinType;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.prestospark.PhysicalResourceSettings;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder;
import com.facebook.presto.testing.TestingMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import org.testng.annotations.Test;
import java.util.List;
import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT;
import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_AVERAGE_INPUT_DATA_SIZE_PER_PARTITION;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MAX_EXECUTOR_COUNT;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MAX_HASH_PARTITION_COUNT;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MIN_EXECUTOR_COUNT;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MIN_HASH_PARTITION_COUNT;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static org.testng.Assert.assertEquals;
public class TestPrestoSparkPhysicalResourceAllocationStrategy
{
// Mocked metadata with table statistics generating estimate count for the purpose of testing.
// No other method is stubbed so will likely throw UnsupportedOperationException.
private static class MockedMetadata
extends AbstractMockMetadata
{
private final Estimate tableSizeEstimate;
public MockedMetadata(Estimate mockedTableSizeEstimate)
{
this.tableSizeEstimate = mockedTableSizeEstimate;
}
@Override
public TableStatistics getTableStatistics(Session session, TableHandle tableHandle, List<ColumnHandle> columnHandles, Constraint<ColumnHandle> constraint)
{
return TableStatistics.builder().setRowCount(Estimate.of(100)).setTotalSize(tableSizeEstimate).build();
}
}
// default properties passed as part of system property
private static final PropertyMetadata<?>[] defaultPropertyMetadata = new PropertyMetadata[] {
PropertyMetadata.integerProperty(SPARK_MIN_EXECUTOR_COUNT, "SPARK_MIN_EXECUTOR_COUNT", 10, false),
PropertyMetadata.integerProperty(SPARK_MAX_EXECUTOR_COUNT, "SPARK_MAX_EXECUTOR_COUNT", 1000, false),
PropertyMetadata.integerProperty(SPARK_MIN_HASH_PARTITION_COUNT, "SPARK_MIN_HASH_PARTITION_COUNT", 10, false),
PropertyMetadata.integerProperty(SPARK_MAX_HASH_PARTITION_COUNT, "SPARK_MAX_HASH_PARTITION_COUNT", 1000, false),
PropertyMetadata.dataSizeProperty(SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR, "SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR", new DataSize(200, DataSize.Unit.BYTE), false),
PropertyMetadata.dataSizeProperty(SPARK_AVERAGE_INPUT_DATA_SIZE_PER_PARTITION, "SPARK_AVERAGE_INPUT_DATA_SIZE_PER_PARTITION", new DataSize(100, DataSize.Unit.BYTE), false),
PropertyMetadata.integerProperty(HASH_PARTITION_COUNT, "HASH_PARTITION_COUNT", 150, false)
};
// system property with allocation based tuning enabled
private static final Session testSessionWithAllocation = testSessionBuilder(createTestingSessionPropertyManager(
new ImmutableList.Builder<PropertyMetadata<?>>().add(defaultPropertyMetadata).add(
PropertyMetadata.booleanProperty(SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED, "SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED", true, false)
).build())).build();
// system property with allocation based tuning disabled
private static final Session testSessionWithoutAllocation = testSessionBuilder(createTestingSessionPropertyManager(
new ImmutableList.Builder<PropertyMetadata<?>>().add(defaultPropertyMetadata).add(
PropertyMetadata.booleanProperty(SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED, "SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED", false, false),
PropertyMetadata.booleanProperty(SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED, "SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED", false, false),
PropertyMetadata.booleanProperty(SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED, "SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED", false, false)
).build())).build();
private static final Metadata mockedMetadata = new MockedMetadata(Estimate.of(1000));
private static final Metadata mockedUnknownMetadata = new MockedMetadata(Estimate.unknown());
/**
* Return any plan node, the node does not even need to be "correct",
* only used for the purpose of traversing and estimating the source stats
*/
private PlanNode getPlanToTest(Session session, Metadata metadata)
{
PlanBuilder planBuilder = new PlanBuilder(session, new PlanNodeIdAllocator(), metadata);
VariableReferenceExpression sourceJoin = planBuilder.variable("sourceJoin");
TableScanNode a = planBuilder.tableScan(ImmutableList.of(sourceJoin), ImmutableMap.of(sourceJoin, new TestingMetadata.TestingColumnHandle("sourceJoin")));
VariableReferenceExpression filteringSource = planBuilder.variable("filteringSource");
TableScanNode b = planBuilder.tableScan(ImmutableList.of(filteringSource), ImmutableMap.of(filteringSource, new TestingMetadata.TestingColumnHandle("filteringSource")));
return planBuilder.join(JoinType.LEFT, a, b);
}
private PhysicalResourceSettings getSettingsHolder(Session session, Metadata metadata)
{
PrestoSparkSourceStatsCollector prestoSparkSourceStatsCollector = new PrestoSparkSourceStatsCollector(metadata, session);
PlanNode nodeToTest = getPlanToTest(session, metadata);
PrestoSparkPhysicalResourceCalculator prestoSparkPhysicalResourceCalculator = new PrestoSparkPhysicalResourceCalculator(150, 100);
return prestoSparkPhysicalResourceCalculator.calculate(nodeToTest, prestoSparkSourceStatsCollector, session);
}
@Test
public void testHashPartitionCountAllocationStrategy()
{
PhysicalResourceSettings settingsHolder = getSettingsHolder(testSessionWithAllocation, mockedMetadata);
assertEquals(settingsHolder.getHashPartitionCount(), 20);
assertEquals(settingsHolder.getMaxExecutorCount(), 10);
}
@Test
public void testStrategyWithUnknownEstimate()
{
PhysicalResourceSettings settingsHolder = getSettingsHolder(testSessionWithAllocation, mockedUnknownMetadata);
assertEquals(settingsHolder.getHashPartitionCount(), 150);
assertEquals(settingsHolder.getMaxExecutorCount(), 100);
}
@Test
public void testHashPartitionCountWithoutAllocationStrategy()
{
PhysicalResourceSettings settingsHolder = getSettingsHolder(testSessionWithoutAllocation, mockedMetadata);
assertEquals(settingsHolder.getHashPartitionCount(), 150);
assertEquals(settingsHolder.getMaxExecutorCount(), 100);
}
}