TaskManagerConfig.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.configuration.DefunctConfig;
import com.facebook.airlift.configuration.LegacyConfig;
import com.facebook.presto.memory.HighMemoryTaskKillerStrategy;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import io.airlift.units.Duration;
import io.airlift.units.MaxDuration;
import io.airlift.units.MinDuration;
import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@DefunctConfig({
"experimental.big-query-max-task-memory",
"task.max-memory",
"task.http-notification-threads",
"task.info-refresh-max-wait",
"task.operator-pre-allocated-memory",
"sink.new-implementation",
"task.legacy-scheduling-behavior",
"task.level-absolute-priority"})
public class TaskManagerConfig
{
private boolean perOperatorCpuTimerEnabled = true;
private boolean taskCpuTimerEnabled = true;
private boolean statisticsCpuTimerEnabled = true;
private boolean perOperatorAllocationTrackingEnabled;
private boolean taskAllocationTrackingEnabled;
private boolean taskUpdateSizeTrackingEnabled = true;
private DataSize maxPartialAggregationMemoryUsage = new DataSize(16, Unit.MEGABYTE);
private DataSize maxLocalExchangeBufferSize = new DataSize(32, Unit.MEGABYTE);
private DataSize maxIndexMemoryUsage = new DataSize(64, Unit.MEGABYTE);
private boolean shareIndexLoading;
private int maxWorkerThreads = Runtime.getRuntime().availableProcessors() * 2;
private Integer minDrivers;
private Integer initialSplitsPerNode;
private int minDriversPerTask = 3;
private int maxDriversPerTask = Integer.MAX_VALUE;
private int maxTasksPerStage = Integer.MAX_VALUE;
private Duration splitConcurrencyAdjustmentInterval = new Duration(100, TimeUnit.MILLISECONDS);
private DataSize sinkMaxBufferSize = new DataSize(32, Unit.MEGABYTE);
private DataSize maxPagePartitioningBufferSize = new DataSize(32, Unit.MEGABYTE);
private Duration clientTimeout = new Duration(2, TimeUnit.MINUTES);
private Duration infoMaxAge = new Duration(15, TimeUnit.MINUTES);
private Duration statusRefreshMaxWait = new Duration(1, TimeUnit.SECONDS);
private Duration infoRefreshMaxWait = new Duration(0, TimeUnit.SECONDS);
private Duration infoUpdateInterval = new Duration(3, TimeUnit.SECONDS);
private int writerCount = 1;
private Integer partitionedWriterCount;
private int taskConcurrency = 16;
private int httpResponseThreads = 100;
private int httpTimeoutConcurrency = 3;
private int httpTimeoutThreads = 3;
private int taskNotificationThreads = 5;
private int taskYieldThreads = 3;
private BigDecimal levelTimeMultiplier = new BigDecimal(2.0);
private boolean legacyLifespanCompletionCondition;
private TaskPriorityTracking taskPriorityTracking = TaskPriorityTracking.TASK_FAIR;
private Duration interruptRunawaySplitsTimeout = new Duration(600, SECONDS);
private double memoryBasedSlowDownThreshold = 1.0;
private HighMemoryTaskKillerStrategy highMemoryTaskKillerStrategy = HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FULL_GC;
private boolean highMemoryTaskKillerEnabled;
private double highMemoryTaskKillerGCReclaimMemoryThreshold = 0.01;
private Duration highMemoryTaskKillerFrequentFullGCDurationThreshold = new Duration(1, SECONDS);
private double highMemoryTaskKillerHeapMemoryThreshold = 0.9;
private boolean enableEventLoop;
private Duration slowMethodThresholdOnEventLoop = new Duration(0, SECONDS);
public long getSlowMethodThresholdOnEventLoop()
{
return slowMethodThresholdOnEventLoop.roundTo(NANOSECONDS);
}
@Config("task.event-loop-slow-method-threshold")
public TaskManagerConfig setSlowMethodThresholdOnEventLoop(Duration slowMethodThresholdOnEventLoop)
{
this.slowMethodThresholdOnEventLoop = slowMethodThresholdOnEventLoop;
return this;
}
@Config("task.enable-event-loop")
public TaskManagerConfig setEventLoopEnabled(boolean enableEventLoop)
{
this.enableEventLoop = enableEventLoop;
return this;
}
public boolean isEventLoopEnabled()
{
return enableEventLoop;
}
@MinDuration("1ms")
@MaxDuration("10s")
@NotNull
public Duration getStatusRefreshMaxWait()
{
return statusRefreshMaxWait;
}
@Config("task.status-refresh-max-wait")
public TaskManagerConfig setStatusRefreshMaxWait(Duration statusRefreshMaxWait)
{
this.statusRefreshMaxWait = statusRefreshMaxWait;
return this;
}
@MinDuration("1ms")
@MaxDuration("10s")
@NotNull
public Duration getInfoUpdateInterval()
{
return infoUpdateInterval;
}
@Config("task.info-update-interval")
@ConfigDescription("Interval between updating task data")
public TaskManagerConfig setInfoUpdateInterval(Duration infoUpdateInterval)
{
this.infoUpdateInterval = infoUpdateInterval;
return this;
}
@NotNull
public Duration getInfoRefreshMaxWait()
{
return infoRefreshMaxWait;
}
@Config("experimental.task.info-update-refresh-max-wait")
@ConfigDescription("When this is set to non-zero, task info update request will be a long polling with " +
"given maximum update refresh wait time. This is an experimental config to reduce unnecessary task info update.")
public TaskManagerConfig setInfoRefreshMaxWait(Duration infoRefreshMaxWait)
{
this.infoRefreshMaxWait = infoRefreshMaxWait;
return this;
}
public boolean isPerOperatorCpuTimerEnabled()
{
return perOperatorCpuTimerEnabled;
}
@LegacyConfig("task.verbose-stats")
@Config("task.per-operator-cpu-timer-enabled")
public TaskManagerConfig setPerOperatorCpuTimerEnabled(boolean perOperatorCpuTimerEnabled)
{
this.perOperatorCpuTimerEnabled = perOperatorCpuTimerEnabled;
return this;
}
public boolean isTaskCpuTimerEnabled()
{
return taskCpuTimerEnabled;
}
@Config("task.cpu-timer-enabled")
public TaskManagerConfig setTaskCpuTimerEnabled(boolean taskCpuTimerEnabled)
{
this.taskCpuTimerEnabled = taskCpuTimerEnabled;
return this;
}
public boolean isStatisticsCpuTimerEnabled()
{
return statisticsCpuTimerEnabled;
}
@Config("task.statistics-cpu-timer-enabled")
public TaskManagerConfig setStatisticsCpuTimerEnabled(boolean statisticsCpuTimerEnabled)
{
this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled;
return this;
}
public boolean isPerOperatorAllocationTrackingEnabled()
{
return perOperatorAllocationTrackingEnabled;
}
@Config("task.per-operator-allocation-tracking-enabled")
public TaskManagerConfig setPerOperatorAllocationTrackingEnabled(boolean perOperatorAllocationTrackingEnabled)
{
this.perOperatorAllocationTrackingEnabled = perOperatorAllocationTrackingEnabled;
return this;
}
public boolean isTaskAllocationTrackingEnabled()
{
return taskAllocationTrackingEnabled;
}
@Config("task.allocation-tracking-enabled")
public TaskManagerConfig setTaskAllocationTrackingEnabled(boolean taskAllocationTrackingEnabled)
{
this.taskAllocationTrackingEnabled = taskAllocationTrackingEnabled;
return this;
}
@NotNull
public DataSize getMaxPartialAggregationMemoryUsage()
{
return maxPartialAggregationMemoryUsage;
}
@Config("task.max-partial-aggregation-memory")
public TaskManagerConfig setMaxPartialAggregationMemoryUsage(DataSize maxPartialAggregationMemoryUsage)
{
this.maxPartialAggregationMemoryUsage = maxPartialAggregationMemoryUsage;
return this;
}
@NotNull
public DataSize getMaxLocalExchangeBufferSize()
{
return maxLocalExchangeBufferSize;
}
@Config("task.max-local-exchange-buffer-size")
public TaskManagerConfig setMaxLocalExchangeBufferSize(DataSize size)
{
this.maxLocalExchangeBufferSize = size;
return this;
}
@NotNull
public DataSize getMaxIndexMemoryUsage()
{
return maxIndexMemoryUsage;
}
@Config("task.max-index-memory")
public TaskManagerConfig setMaxIndexMemoryUsage(DataSize maxIndexMemoryUsage)
{
this.maxIndexMemoryUsage = maxIndexMemoryUsage;
return this;
}
@NotNull
public boolean isShareIndexLoading()
{
return shareIndexLoading;
}
@Config("task.share-index-loading")
public TaskManagerConfig setShareIndexLoading(boolean shareIndexLoading)
{
this.shareIndexLoading = shareIndexLoading;
return this;
}
@Min(0)
public BigDecimal getLevelTimeMultiplier()
{
return levelTimeMultiplier;
}
@Config("task.level-time-multiplier")
@ConfigDescription("Factor that determines the target scheduled time for a level relative to the next")
public TaskManagerConfig setLevelTimeMultiplier(BigDecimal levelTimeMultiplier)
{
this.levelTimeMultiplier = levelTimeMultiplier;
return this;
}
@Min(1)
public int getMaxWorkerThreads()
{
return maxWorkerThreads;
}
@LegacyConfig("task.shard.max-threads")
@Config("task.max-worker-threads")
public TaskManagerConfig setMaxWorkerThreads(String maxWorkerThreads)
{
this.maxWorkerThreads = ThreadCountParser.parse(maxWorkerThreads);
return this;
}
@Min(1)
public int getInitialSplitsPerNode()
{
if (initialSplitsPerNode == null) {
return maxWorkerThreads;
}
return initialSplitsPerNode;
}
@Config("task.initial-splits-per-node")
public TaskManagerConfig setInitialSplitsPerNode(int initialSplitsPerNode)
{
this.initialSplitsPerNode = initialSplitsPerNode;
return this;
}
@MinDuration("1ms")
public Duration getSplitConcurrencyAdjustmentInterval()
{
return splitConcurrencyAdjustmentInterval;
}
@Config("task.split-concurrency-adjustment-interval")
public TaskManagerConfig setSplitConcurrencyAdjustmentInterval(Duration splitConcurrencyAdjustmentInterval)
{
this.splitConcurrencyAdjustmentInterval = splitConcurrencyAdjustmentInterval;
return this;
}
@Min(1)
public int getMinDrivers()
{
if (minDrivers == null) {
return 2 * maxWorkerThreads;
}
return minDrivers;
}
@Config("task.min-drivers")
public TaskManagerConfig setMinDrivers(int minDrivers)
{
this.minDrivers = minDrivers;
return this;
}
@Min(1)
public int getMaxDriversPerTask()
{
return maxDriversPerTask;
}
@Config("task.max-drivers-per-task")
@ConfigDescription("Maximum number of drivers a task can run")
public TaskManagerConfig setMaxDriversPerTask(int maxDriversPerTask)
{
this.maxDriversPerTask = maxDriversPerTask;
return this;
}
@Min(1)
public int getMinDriversPerTask()
{
return minDriversPerTask;
}
@Config("task.min-drivers-per-task")
@ConfigDescription("Minimum number of drivers guaranteed to run per task given there is sufficient work to do")
public TaskManagerConfig setMinDriversPerTask(int minDriversPerTask)
{
this.minDriversPerTask = minDriversPerTask;
return this;
}
@Min(1)
public int getMaxTasksPerStage()
{
return maxTasksPerStage;
}
@Config("stage.max-tasks-per-stage")
@ConfigDescription("Maximum number of tasks for a non source distributed stage")
public TaskManagerConfig setMaxTasksPerStage(int maxTasksPerStage)
{
this.maxTasksPerStage = maxTasksPerStage;
return this;
}
@NotNull
public DataSize getSinkMaxBufferSize()
{
return sinkMaxBufferSize;
}
@Config("sink.max-buffer-size")
public TaskManagerConfig setSinkMaxBufferSize(DataSize sinkMaxBufferSize)
{
this.sinkMaxBufferSize = sinkMaxBufferSize;
return this;
}
@NotNull
public DataSize getMaxPagePartitioningBufferSize()
{
return maxPagePartitioningBufferSize;
}
@Config("driver.max-page-partitioning-buffer-size")
public TaskManagerConfig setMaxPagePartitioningBufferSize(DataSize size)
{
this.maxPagePartitioningBufferSize = size;
return this;
}
@MinDuration("5s")
@NotNull
public Duration getClientTimeout()
{
return clientTimeout;
}
@Config("task.client.timeout")
public TaskManagerConfig setClientTimeout(Duration clientTimeout)
{
this.clientTimeout = clientTimeout;
return this;
}
@NotNull
public Duration getInfoMaxAge()
{
return infoMaxAge;
}
@Config("task.info.max-age")
public TaskManagerConfig setInfoMaxAge(Duration infoMaxAge)
{
this.infoMaxAge = infoMaxAge;
return this;
}
@Min(1)
public int getWriterCount()
{
return writerCount;
}
// NOTE: writer count needs to be a power of two for java query engine.
@Config("task.writer-count")
@ConfigDescription("Number of writer threads per task")
public TaskManagerConfig setWriterCount(int writerCount)
{
this.writerCount = writerCount;
return this;
}
@Min(1)
public Integer getPartitionedWriterCount()
{
return partitionedWriterCount;
}
// NOTE: partitioned writer count needs to be a power of two for java query engine.
@Config("task.partitioned-writer-count")
@ConfigDescription("Number of writer threads per task for partitioned writes. If not set, the number set by task.writer-count will be used")
public TaskManagerConfig setPartitionedWriterCount(Integer partitionedWriterCount)
{
this.partitionedWriterCount = partitionedWriterCount;
return this;
}
@Min(1)
public int getTaskConcurrency()
{
return taskConcurrency;
}
@Config("task.concurrency")
@ConfigDescription("Default number of local parallel jobs per worker")
public TaskManagerConfig setTaskConcurrency(int taskConcurrency)
{
this.taskConcurrency = taskConcurrency;
return this;
}
@Min(1)
public int getHttpResponseThreads()
{
return httpResponseThreads;
}
@Config("task.http-response-threads")
public TaskManagerConfig setHttpResponseThreads(int httpResponseThreads)
{
this.httpResponseThreads = httpResponseThreads;
return this;
}
@Min(1)
public int getHttpTimeoutThreads()
{
return httpTimeoutThreads;
}
@Config("task.http-timeout-threads")
@ConfigDescription("Total number of timeout threads across all timeout thread pools")
public TaskManagerConfig setHttpTimeoutThreads(int httpTimeoutThreads)
{
this.httpTimeoutThreads = httpTimeoutThreads;
return this;
}
@Min(1)
public int getHttpTimeoutConcurrency()
{
return httpTimeoutConcurrency;
}
@Config("task.http-timeout-concurrency")
@ConfigDescription("Number of thread pools to handle timeouts. Threads per pool is calculated by http-timeout-threads / http-timeout-concurrency")
public TaskManagerConfig setHttpTimeoutConcurrency(int httpTimeoutConcurrency)
{
this.httpTimeoutConcurrency = httpTimeoutConcurrency;
return this;
}
@Min(1)
public int getTaskNotificationThreads()
{
return taskNotificationThreads;
}
@Config("task.task-notification-threads")
@ConfigDescription("Number of threads used for internal task event notifications")
public TaskManagerConfig setTaskNotificationThreads(int taskNotificationThreads)
{
this.taskNotificationThreads = taskNotificationThreads;
return this;
}
@Min(1)
public int getTaskYieldThreads()
{
return taskYieldThreads;
}
@Config("task.task-yield-threads")
@ConfigDescription("Number of threads used for setting yield signals")
public TaskManagerConfig setTaskYieldThreads(int taskYieldThreads)
{
this.taskYieldThreads = taskYieldThreads;
return this;
}
@Deprecated
public boolean isLegacyLifespanCompletionCondition()
{
return legacyLifespanCompletionCondition;
}
@Deprecated
@Config("task.legacy-lifespan-completion-condition")
public TaskManagerConfig setLegacyLifespanCompletionCondition(boolean legacyLifespanCompletionCondition)
{
this.legacyLifespanCompletionCondition = legacyLifespanCompletionCondition;
return this;
}
@NotNull
public TaskPriorityTracking getTaskPriorityTracking()
{
return taskPriorityTracking;
}
@Config("task.task-priority-tracking")
public TaskManagerConfig setTaskPriorityTracking(TaskPriorityTracking taskPriorityTracking)
{
this.taskPriorityTracking = taskPriorityTracking;
return this;
}
public enum TaskPriorityTracking
{
TASK_FAIR,
QUERY_FAIR,
}
@MinDuration("1s")
public Duration getInterruptRunawaySplitsTimeout()
{
return interruptRunawaySplitsTimeout;
}
@Config("task.interrupt-runaway-splits-timeout")
@ConfigDescription("Interrupt runaway split threads after this timeout if the task is stuck in certain allow listed places")
public TaskManagerConfig setInterruptRunawaySplitsTimeout(Duration interruptRunawaySplitsTimeout)
{
this.interruptRunawaySplitsTimeout = interruptRunawaySplitsTimeout;
return this;
}
//Allowing low value to be 70 percent to avoid slowing down overall cluster by setting it too low
@DecimalMin("0.7")
@DecimalMax("1.0")
public double getMemoryBasedSlowDownThreshold()
{
return memoryBasedSlowDownThreshold;
}
@Config("experimental.task.memory-based-slowdown-threshold")
@ConfigDescription("Pause processing new leaf split if heap memory usage crosses the threshold. This feature is experimental and use it with caution as could lead to deadlock.")
public TaskManagerConfig setMemoryBasedSlowDownThreshold(double memoryBasedSlowDownThreshold)
{
this.memoryBasedSlowDownThreshold = memoryBasedSlowDownThreshold;
return this;
}
public boolean isHighMemoryTaskKillerEnabled()
{
return highMemoryTaskKillerEnabled;
}
@Config("experimental.task.high-memory-task-killer-enabled")
public TaskManagerConfig setHighMemoryTaskKillerEnabled(boolean highMemoryTaskKillerEnabled)
{
this.highMemoryTaskKillerEnabled = highMemoryTaskKillerEnabled;
return this;
}
public Double getHighMemoryTaskKillerHeapMemoryThreshold()
{
return highMemoryTaskKillerHeapMemoryThreshold;
}
@Config("experimental.task.high-memory-task-killer-heap-memory-threshold")
@ConfigDescription("Heap memory threshold to help high task memory killer to identify if workers is running with high heap usage")
public TaskManagerConfig setHighMemoryTaskKillerHeapMemoryThreshold(Double highMemoryTaskKillerHeapMemoryThreshold)
{
this.highMemoryTaskKillerHeapMemoryThreshold = highMemoryTaskKillerHeapMemoryThreshold;
return this;
}
public Double getHighMemoryTaskKillerGCReclaimMemoryThreshold()
{
return highMemoryTaskKillerGCReclaimMemoryThreshold;
}
@Config("experimental.task.high-memory-task-killer-reclaim-memory-threshold")
@ConfigDescription("Full GC Reclaim memory threshold (based on -Xmx) to help high task memory killer to identify if enough memory is reclaimed or not.")
public TaskManagerConfig setHighMemoryTaskKillerGCReclaimMemoryThreshold(Double highMemoryTaskKillerGCReclaimMemoryThreshold)
{
this.highMemoryTaskKillerGCReclaimMemoryThreshold = highMemoryTaskKillerGCReclaimMemoryThreshold;
return this;
}
public Duration getHighMemoryTaskKillerFrequentFullGCDurationThreshold()
{
return highMemoryTaskKillerFrequentFullGCDurationThreshold;
}
@Config("experimental.task.high-memory-task-killer-frequent-full-gc-duration-threshold")
@ConfigDescription("Threshold to identify if full GCs happening frequently and considered for the task killer to trigger")
public TaskManagerConfig setHighMemoryTaskKillerFrequentFullGCDurationThreshold(Duration highMemoryTaskKillerFrequentFullGCDurationThreshold)
{
this.highMemoryTaskKillerFrequentFullGCDurationThreshold = highMemoryTaskKillerFrequentFullGCDurationThreshold;
return this;
}
public HighMemoryTaskKillerStrategy getHighMemoryTaskKillerStrategy()
{
return highMemoryTaskKillerStrategy;
}
@Config("experimental.task.high-memory-task-killer-strategy")
public TaskManagerConfig setHighMemoryTaskKillerStrategy(HighMemoryTaskKillerStrategy highMemoryTaskKillerStrategy)
{
this.highMemoryTaskKillerStrategy = highMemoryTaskKillerStrategy;
return this;
}
public boolean isTaskUpdateSizeTrackingEnabled()
{
return taskUpdateSizeTrackingEnabled;
}
@Config("task.update-size-tracking-enabled")
public TaskManagerConfig setTaskUpdateSizeTrackingEnabled(boolean taskUpdateSizeTrackingEnabled)
{
this.taskUpdateSizeTrackingEnabled = taskUpdateSizeTrackingEnabled;
return this;
}
}