JavaWorkerSessionPropertyProvider.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sessionpropertyproviders;
import com.facebook.presto.Session;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.session.WorkerSessionPropertyProvider;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.JavaFeaturesConfig;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import java.util.List;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
public class JavaWorkerSessionPropertyProvider
implements WorkerSessionPropertyProvider
{
public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled";
public static final String TOPN_SPILL_ENABLED = "topn_spill_enabled";
public static final String DISTINCT_AGGREGATION_SPILL_ENABLED = "distinct_aggregation_spill_enabled";
public static final String DEDUP_BASED_DISTINCT_AGGREGATION_SPILL_ENABLED = "dedup_based_distinct_aggregation_spill_enabled";
public static final String DISTINCT_AGGREGATION_LARGE_BLOCK_SPILL_ENABLED = "distinct_aggregation_large_block_spill_enabled";
public static final String DISTINCT_AGGREGATION_LARGE_BLOCK_SIZE_THRESHOLD = "distinct_aggregation_large_block_size_threshold";
public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled";
public static final String WINDOW_SPILL_ENABLED = "window_spill_enabled";
public static final String ORDER_BY_SPILL_ENABLED = "order_by_spill_enabled";
public static final String AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT = "aggregation_operator_unspill_memory_limit";
public static final String TOPN_OPERATOR_UNSPILL_MEMORY_LIMIT = "topn_operator_unspill_memory_limit";
public static final String TEMP_STORAGE_SPILLER_BUFFER_SIZE = "temp_storage_spiller_buffer_size";
private final List<PropertyMetadata<?>> sessionProperties;
@Inject
public JavaWorkerSessionPropertyProvider(FeaturesConfig featuresConfig, JavaFeaturesConfig javaFeaturesConfig, NodeSpillConfig nodeSpillConfig)
{
boolean nativeExecution = requireNonNull(featuresConfig, "featuresConfig is null").isNativeExecutionEnabled();
sessionProperties = ImmutableList.of(
booleanProperty(
TOPN_SPILL_ENABLED,
"Enable topN spilling if spill_enabled",
javaFeaturesConfig.isTopNSpillEnabled(),
nativeExecution),
booleanProperty(
AGGREGATION_SPILL_ENABLED,
"Enable aggregate spilling if spill_enabled",
javaFeaturesConfig.isAggregationSpillEnabled(),
nativeExecution),
booleanProperty(
DISTINCT_AGGREGATION_SPILL_ENABLED,
"Enable spill for distinct aggregations if spill_enabled and aggregation_spill_enabled",
javaFeaturesConfig.isDistinctAggregationSpillEnabled(),
nativeExecution),
booleanProperty(
DEDUP_BASED_DISTINCT_AGGREGATION_SPILL_ENABLED,
"Perform deduplication of input data for distinct aggregates before spilling",
javaFeaturesConfig.isDedupBasedDistinctAggregationSpillEnabled(),
nativeExecution),
booleanProperty(
DISTINCT_AGGREGATION_LARGE_BLOCK_SPILL_ENABLED,
"Spill large block to a separate spill file",
javaFeaturesConfig.isDistinctAggregationLargeBlockSpillEnabled(),
nativeExecution),
new PropertyMetadata<>(
DISTINCT_AGGREGATION_LARGE_BLOCK_SIZE_THRESHOLD,
"Block size threshold beyond which it will be spilled into a separate spill file",
VARCHAR,
DataSize.class,
javaFeaturesConfig.getDistinctAggregationLargeBlockSizeThreshold(),
nativeExecution,
value -> DataSize.valueOf((String) value),
DataSize::toString),
booleanProperty(
ORDER_BY_AGGREGATION_SPILL_ENABLED,
"Enable spill for order-by aggregations if spill_enabled and aggregation_spill_enabled",
javaFeaturesConfig.isOrderByAggregationSpillEnabled(),
nativeExecution),
booleanProperty(
WINDOW_SPILL_ENABLED,
"Enable window spilling if spill_enabled",
javaFeaturesConfig.isWindowSpillEnabled(),
nativeExecution),
booleanProperty(
ORDER_BY_SPILL_ENABLED,
"Enable order by spilling if spill_enabled",
javaFeaturesConfig.isOrderBySpillEnabled(),
nativeExecution),
new PropertyMetadata<>(
AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT,
"Experimental: How much memory can should be allocated per aggregation operator in unspilling process",
VARCHAR,
DataSize.class,
javaFeaturesConfig.getAggregationOperatorUnspillMemoryLimit(),
nativeExecution,
value -> DataSize.valueOf((String) value),
DataSize::toString),
new PropertyMetadata<>(
TOPN_OPERATOR_UNSPILL_MEMORY_LIMIT,
"How much memory can should be allocated per topN operator in unspilling process",
VARCHAR,
DataSize.class,
javaFeaturesConfig.getTopNOperatorUnspillMemoryLimit(),
nativeExecution,
value -> DataSize.valueOf((String) value),
DataSize::toString),
new PropertyMetadata<>(
TEMP_STORAGE_SPILLER_BUFFER_SIZE,
"Experimental: Buffer size used by TempStorageSingleStreamSpiller",
VARCHAR,
DataSize.class,
nodeSpillConfig.getTempStorageBufferSize(),
nativeExecution,
value -> DataSize.valueOf((String) value),
DataSize::toString));
}
@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}
public static boolean isTopNSpillEnabled(Session session)
{
return session.getSystemProperty(TOPN_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}
public static boolean isAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}
public static boolean isDistinctAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session);
}
public static boolean isDedupBasedDistinctAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(DEDUP_BASED_DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class);
}
public static boolean isDistinctAggregationLargeBlockSpillEnabled(Session session)
{
return session.getSystemProperty(DISTINCT_AGGREGATION_LARGE_BLOCK_SPILL_ENABLED, Boolean.class);
}
public static DataSize getDistinctAggregationLargeBlockSizeThreshold(Session session)
{
return session.getSystemProperty(DISTINCT_AGGREGATION_LARGE_BLOCK_SIZE_THRESHOLD, DataSize.class);
}
public static boolean isOrderByAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session);
}
public static boolean isWindowSpillEnabled(Session session)
{
return session.getSystemProperty(WINDOW_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}
public static boolean isOrderBySpillEnabled(Session session)
{
return session.getSystemProperty(ORDER_BY_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}
public static DataSize getAggregationOperatorUnspillMemoryLimit(Session session)
{
DataSize memoryLimitForMerge = session.getSystemProperty(AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT, DataSize.class);
checkArgument(memoryLimitForMerge.toBytes() >= 0, "%s must be positive", AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT);
return memoryLimitForMerge;
}
public static DataSize getTopNOperatorUnspillMemoryLimit(Session session)
{
DataSize unspillMemoryLimit = session.getSystemProperty(TOPN_OPERATOR_UNSPILL_MEMORY_LIMIT, DataSize.class);
checkArgument(unspillMemoryLimit.toBytes() >= 0, "%s must be positive", TOPN_OPERATOR_UNSPILL_MEMORY_LIMIT);
return unspillMemoryLimit;
}
public static DataSize getTempStorageSpillerBufferSize(Session session)
{
DataSize tempStorageSpillerBufferSize = session.getSystemProperty(TEMP_STORAGE_SPILLER_BUFFER_SIZE, DataSize.class);
checkArgument(tempStorageSpillerBufferSize.toBytes() >= 0, "%s must be positive", TEMP_STORAGE_SPILLER_BUFFER_SIZE);
return tempStorageSpillerBufferSize;
}
}