TestPrestoSparkConfig.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.spark;

import com.facebook.airlift.configuration.testing.ConfigAssertions;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.util.Map;

import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.MINUTES;

public class TestPrestoSparkConfig
{
    @Test
    public void testDefaults()
    {
        assertRecordedDefaults(ConfigAssertions.recordDefaults(PrestoSparkConfig.class)
                .setSparkPartitionCountAutoTuneEnabled(true)
                .setInitialSparkPartitionCount(16)
                .setMinSparkInputPartitionCountForAutoTune(100)
                .setMaxSparkInputPartitionCountForAutoTune(1000)
                .setMaxSplitsDataSizePerSparkPartition(new DataSize(2, GIGABYTE))
                .setShuffleOutputTargetAverageRowSize(new DataSize(1, KILOBYTE))
                .setStorageBasedBroadcastJoinEnabled(false)
                .setStorageBasedBroadcastJoinStorage("local")
                .setStorageBasedBroadcastJoinWriteBufferSize(new DataSize(24, MEGABYTE))
                .setSparkBroadcastJoinMaxMemoryOverride(null)
                .setSmileSerializationEnabled(true)
                .setSplitAssignmentBatchSize(1_000_000)
                .setMemoryRevokingThreshold(0)
                .setMemoryRevokingTarget(0)
                .setRetryOnOutOfMemoryBroadcastJoinEnabled(false)
                .setRetryOnOutOfMemoryWithIncreasedMemorySettingsEnabled(false)
                .setOutOfMemoryRetryPrestoSessionProperties("")
                .setOutOfMemoryRetrySparkConfigs("")
                .setAverageInputDataSizePerExecutor(new DataSize(10, GIGABYTE))
                .setMaxExecutorCount(600)
                .setMinExecutorCount(200)
                .setAverageInputDataSizePerPartition(new DataSize(2, GIGABYTE))
                .setMaxHashPartitionCount(4096)
                .setMinHashPartitionCount(1024)
                .setSparkResourceAllocationStrategyEnabled(false)
                .setRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled(false)
                .setHashPartitionCountScalingFactorOnOutOfMemory(2.0)
                .setAdaptiveQueryExecutionEnabled(false)
                .setAdaptiveJoinSideSwitchingEnabled(false)
                .setExecutorAllocationStrategyEnabled(false)
                .setHashPartitionCountAllocationStrategyEnabled(false)
                .setNativeExecutionBroadcastBasePath(null)
                .setNativeTerminateWithCoreWhenUnresponsiveEnabled(false)
                .setNativeTerminateWithCoreTimeout(new Duration(5, MINUTES))
                .setDynamicPrestoMemoryPoolTuningEnabled(false)
                .setDynamicPrestoMemoryPoolTuningFraction(0.7)
                .setAttemptNumberToApplyDynamicMemoryPoolTuning(1));
    }

    @Test
    public void testExplicitPropertyMappings()
    {
        Map<String, String> properties = new ImmutableMap.Builder<String, String>()
                .put("spark.partition-count-auto-tune-enabled", "false")
                .put("spark.initial-partition-count", "128")
                .put("spark.min-spark-input-partition-count-for-auto-tune", "200")
                .put("spark.max-spark-input-partition-count-for-auto-tune", "2000")
                .put("spark.max-splits-data-size-per-partition", "4GB")
                .put("spark.shuffle-output-target-average-row-size", "10kB")
                .put("spark.storage-based-broadcast-join-enabled", "true")
                .put("spark.storage-based-broadcast-join-storage", "tempfs")
                .put("spark.storage-based-broadcast-join-write-buffer-size", "4MB")
                .put("spark.broadcast-join-max-memory-override", "1GB")
                .put("spark.smile-serialization-enabled", "false")
                .put("spark.split-assignment-batch-size", "420")
                .put("spark.memory-revoking-threshold", "0.5")
                .put("spark.memory-revoking-target", "0.5")
                .put("spark.retry-on-out-of-memory-broadcast-join-enabled", "true")
                .put("spark.retry-on-out-of-memory-with-increased-memory-settings-enabled", "true")
                .put("spark.retry-presto-session-properties", "query_max_memory_per_node=1MB,query_max_total_memory_per_node=1MB")
                .put("spark.retry-spark-configs", "spark.executor.memory=1g,spark.task.cpus=5")
                .put("spark.average-input-datasize-per-executor", "5GB")
                .put("spark.max-executor-count", "29")
                .put("spark.min-executor-count", "2")
                .put("spark.average-input-datasize-per-partition", "1GB")
                .put("spark.max-hash-partition-count", "333")
                .put("spark.min-hash-partition-count", "30")
                .put("spark.resource-allocation-strategy-enabled", "true")
                .put("spark.retry-on-out-of-memory-higher-hash-partition-count-enabled", "true")
                .put("spark.hash-partition-count-scaling-factor-on-out-of-memory", "5.6")
                .put("spark.adaptive-query-execution-enabled", "true")
                .put("optimizer.adaptive-join-side-switching-enabled", "true")
                .put("spark.executor-allocation-strategy-enabled", "true")
                .put("spark.hash-partition-count-allocation-strategy-enabled", "true")
                .put("native-execution-broadcast-base-path", "/tmp/broadcast_path")
                .put("native-terminate-with-core-when-unresponsive-enabled", "true")
                .put("native-terminate-with-core-timeout", "1m")
                .put("spark.dynamic-presto-memory-pool-tuning-enabled", "true")
                .put("spark.dynamic-presto-memory-pool-tuning-fraction", "0.8")
                .put("spark.attempt-number-to-apply-dynamic-memory-pool-tuning", "0")
                .build();
        PrestoSparkConfig expected = new PrestoSparkConfig()
                .setSparkPartitionCountAutoTuneEnabled(false)
                .setInitialSparkPartitionCount(128)
                .setMinSparkInputPartitionCountForAutoTune(200)
                .setMaxSparkInputPartitionCountForAutoTune(2000)
                .setMaxSplitsDataSizePerSparkPartition(new DataSize(4, GIGABYTE))
                .setShuffleOutputTargetAverageRowSize(new DataSize(10, KILOBYTE))
                .setStorageBasedBroadcastJoinEnabled(true)
                .setStorageBasedBroadcastJoinStorage("tempfs")
                .setStorageBasedBroadcastJoinWriteBufferSize(new DataSize(4, MEGABYTE))
                .setSparkBroadcastJoinMaxMemoryOverride(new DataSize(1, GIGABYTE))
                .setSmileSerializationEnabled(false)
                .setSplitAssignmentBatchSize(420)
                .setMemoryRevokingThreshold(0.5)
                .setMemoryRevokingTarget(0.5)
                .setRetryOnOutOfMemoryBroadcastJoinEnabled(true)
                .setRetryOnOutOfMemoryWithIncreasedMemorySettingsEnabled(true)
                .setOutOfMemoryRetryPrestoSessionProperties("query_max_memory_per_node=1MB,query_max_total_memory_per_node=1MB")
                .setOutOfMemoryRetrySparkConfigs("spark.executor.memory=1g,spark.task.cpus=5")
                .setAverageInputDataSizePerExecutor(new DataSize(5, GIGABYTE))
                .setMaxExecutorCount(29)
                .setMinExecutorCount(2)
                .setAverageInputDataSizePerPartition(new DataSize(1, GIGABYTE))
                .setMaxHashPartitionCount(333)
                .setMinHashPartitionCount(30)
                .setSparkResourceAllocationStrategyEnabled(true)
                .setRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled(true)
                .setHashPartitionCountScalingFactorOnOutOfMemory(5.6)
                .setAdaptiveQueryExecutionEnabled(true)
                .setAdaptiveJoinSideSwitchingEnabled(true)
                .setHashPartitionCountAllocationStrategyEnabled(true)
                .setExecutorAllocationStrategyEnabled(true)
                .setNativeExecutionBroadcastBasePath("/tmp/broadcast_path")
                .setNativeTerminateWithCoreWhenUnresponsiveEnabled(true)
                .setNativeTerminateWithCoreTimeout(new Duration(1, MINUTES))
                .setDynamicPrestoMemoryPoolTuningEnabled(true)
                .setDynamicPrestoMemoryPoolTuningFraction(0.8)
                .setAttemptNumberToApplyDynamicMemoryPoolTuning(0);
        assertFullMapping(properties, expected);
    }
}