TaskExecutor.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.executor;
import com.facebook.airlift.concurrent.SetThreadName;
import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.airlift.stats.TimeDistribution;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.execution.SplitRunner;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking;
import com.facebook.presto.operator.scalar.JoniRegexpFunctions;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.version.EmbedVersion;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.DoubleSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.presto.execution.executor.MultilevelSplitQueue.computeLevel;
import static com.facebook.presto.util.MoreMath.min;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static java.lang.System.lineSeparator;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
@ThreadSafe
public class TaskExecutor
{
private static final Logger log = Logger.get(TaskExecutor.class);
// print out split call stack if it has been running for a certain amount of time
private static final Duration LONG_SPLIT_WARNING_THRESHOLD = new Duration(600, SECONDS);
// Interrupt a split if it is running longer than this AND it's blocked on something known
private static final Predicate<List<StackTraceElement>> DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE = elements ->
elements.stream()
.anyMatch(element -> element.getClassName().equals(JoniRegexpFunctions.CLASS_NAME));
private static final Duration DEFAULT_INTERRUPT_SPLIT_INTERVAL = new Duration(60, SECONDS);
private static final AtomicLong NEXT_RUNNER_ID = new AtomicLong();
private final ExecutorService executor;
private final ThreadPoolExecutorMBean executorMBean;
private final int runnerThreads;
private final int minimumNumberOfDrivers;
private final int guaranteedNumberOfDriversPerTask;
private final int maximumNumberOfDriversPerTask;
private final EmbedVersion embedVersion;
private final Ticker ticker;
private final Duration interruptRunawaySplitsTimeout;
private final Predicate<List<StackTraceElement>> interruptibleSplitPredicate;
private final Duration interruptSplitInterval;
private final ScheduledExecutorService splitMonitorExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("TaskExecutor"));
private final SortedSet<RunningSplitInfo> runningSplitInfos = new ConcurrentSkipListSet<>();
@GuardedBy("this")
private final List<TaskHandle> tasks;
/**
* All splits registered with the task executor.
*/
@GuardedBy("this")
private final Set<PrioritizedSplitRunner> allSplits = new HashSet<>();
/**
* Intermediate splits (i.e. splits that should not be queued).
*/
@GuardedBy("this")
private final Set<PrioritizedSplitRunner> intermediateSplits = new HashSet<>();
/**
* Splits waiting for a runner thread.
*/
private final MultilevelSplitQueue waitingSplits;
/**
* Per query priority trackers
*/
private final Function<QueryId, TaskPriorityTracker> taskPriorityTrackerFactory;
/**
* Splits running on a thread.
*/
private final Set<PrioritizedSplitRunner> runningSplits = newConcurrentHashSet();
/**
* Splits blocked by the driver.
*/
private final Map<PrioritizedSplitRunner, Future<?>> blockedSplits = new ConcurrentHashMap<>();
private final AtomicLongArray completedTasksPerLevel = new AtomicLongArray(5);
private final AtomicLongArray completedSplitsPerLevel = new AtomicLongArray(5);
private final TimeStat splitQueuedTime = new TimeStat(NANOSECONDS);
private final TimeStat splitWallTime = new TimeStat(NANOSECONDS);
private final TimeDistribution leafSplitWallTime = new TimeDistribution(MICROSECONDS);
private final TimeDistribution intermediateSplitWallTime = new TimeDistribution(MICROSECONDS);
private final TimeDistribution leafSplitScheduledTime = new TimeDistribution(MICROSECONDS);
private final TimeDistribution intermediateSplitScheduledTime = new TimeDistribution(MICROSECONDS);
private final TimeDistribution leafSplitWaitTime = new TimeDistribution(MICROSECONDS);
private final TimeDistribution intermediateSplitWaitTime = new TimeDistribution(MICROSECONDS);
private final TimeDistribution leafSplitCpuTime = new TimeDistribution(MICROSECONDS);
private final TimeDistribution intermediateSplitCpuTime = new TimeDistribution(MICROSECONDS);
// shared between SplitRunners
private final CounterStat globalCpuTimeMicros = new CounterStat();
private final CounterStat globalScheduledTimeMicros = new CounterStat();
private final CounterStat splitSkippedDueToMemoryPressure = new CounterStat();
private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS);
private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS);
private volatile boolean closed;
private volatile boolean lowMemory;
@Inject
public TaskExecutor(TaskManagerConfig config, EmbedVersion embedVersion, MultilevelSplitQueue splitQueue)
{
this(requireNonNull(config, "config is null").getMaxWorkerThreads(),
config.getMinDrivers(),
config.getMinDriversPerTask(),
config.getMaxDriversPerTask(),
config.getTaskPriorityTracking(),
config.getInterruptRunawaySplitsTimeout(),
DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE,
DEFAULT_INTERRUPT_SPLIT_INTERVAL,
embedVersion,
splitQueue,
Ticker.systemTicker());
}
@VisibleForTesting
public TaskExecutor(
int runnerThreads,
int minDrivers,
int guaranteedNumberOfDriversPerTask,
int maximumNumberOfDriversPerTask,
TaskPriorityTracking taskPriorityTracking,
Ticker ticker)
{
this(
runnerThreads,
minDrivers,
guaranteedNumberOfDriversPerTask,
maximumNumberOfDriversPerTask,
taskPriorityTracking,
new TaskManagerConfig().getInterruptRunawaySplitsTimeout(),
DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE,
DEFAULT_INTERRUPT_SPLIT_INTERVAL,
new EmbedVersion(new ServerConfig()),
new MultilevelSplitQueue(2), ticker);
}
@VisibleForTesting
public TaskExecutor(
int runnerThreads,
int minDrivers,
int guaranteedNumberOfDriversPerTask,
int maximumNumberOfDriversPerTask,
TaskPriorityTracking taskPriorityTracking,
MultilevelSplitQueue splitQueue,
Ticker ticker)
{
this(
runnerThreads,
minDrivers,
guaranteedNumberOfDriversPerTask,
maximumNumberOfDriversPerTask,
taskPriorityTracking,
new TaskManagerConfig().getInterruptRunawaySplitsTimeout(),
DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE,
DEFAULT_INTERRUPT_SPLIT_INTERVAL,
new EmbedVersion(new ServerConfig()),
splitQueue,
ticker);
}
@VisibleForTesting
public TaskExecutor(
int runnerThreads,
int minDrivers,
int guaranteedNumberOfDriversPerTask,
int maximumNumberOfDriversPerTask,
TaskPriorityTracking taskPriorityTracking,
Duration interruptRunawaySplitsTimeout,
Predicate<List<StackTraceElement>> interruptibleSplitPredicate,
Duration interruptSplitInterval,
EmbedVersion embedVersion,
MultilevelSplitQueue splitQueue,
Ticker ticker)
{
checkArgument(runnerThreads > 0, "runnerThreads must be at least 1");
checkArgument(guaranteedNumberOfDriversPerTask > 0, "guaranteedNumberOfDriversPerTask must be at least 1");
checkArgument(maximumNumberOfDriversPerTask > 0, "maximumNumberOfDriversPerTask must be at least 1");
checkArgument(guaranteedNumberOfDriversPerTask <= maximumNumberOfDriversPerTask, "guaranteedNumberOfDriversPerTask cannot be greater than maximumNumberOfDriversPerTask");
checkArgument(interruptRunawaySplitsTimeout.getValue(SECONDS) >= 1.0, "interruptRunawaySplitsTimeout must be at least 1 second");
checkArgument(interruptSplitInterval.getValue(SECONDS) >= 1.0, "interruptSplitInterval must be at least 1 second");
// we manage thread pool size directly, so create an unlimited pool
this.executor = newCachedThreadPool(threadsNamed("task-processor-%s"));
this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor);
this.runnerThreads = runnerThreads;
this.embedVersion = requireNonNull(embedVersion, "embedVersion is null");
this.ticker = requireNonNull(ticker, "ticker is null");
this.minimumNumberOfDrivers = minDrivers;
this.guaranteedNumberOfDriversPerTask = guaranteedNumberOfDriversPerTask;
this.maximumNumberOfDriversPerTask = maximumNumberOfDriversPerTask;
this.waitingSplits = requireNonNull(splitQueue, "splitQueue is null");
Function<QueryId, TaskPriorityTracker> taskPriorityTrackerFactory;
switch (taskPriorityTracking) {
case TASK_FAIR:
taskPriorityTrackerFactory = (queryId) -> new TaskPriorityTracker(splitQueue);
break;
case QUERY_FAIR:
LoadingCache<QueryId, TaskPriorityTracker> cache = CacheBuilder.newBuilder()
.weakValues()
.build(CacheLoader.from(queryId -> new TaskPriorityTracker(splitQueue)));
taskPriorityTrackerFactory = cache::getUnchecked;
break;
default:
throw new IllegalArgumentException("Unexpected taskPriorityTracking: " + taskPriorityTracking);
}
this.taskPriorityTrackerFactory = taskPriorityTrackerFactory;
this.tasks = new LinkedList<>();
this.interruptRunawaySplitsTimeout = interruptRunawaySplitsTimeout;
this.interruptibleSplitPredicate = interruptibleSplitPredicate;
this.interruptSplitInterval = interruptSplitInterval;
}
@PostConstruct
public synchronized void start()
{
checkState(!closed, "TaskExecutor is closed");
for (int i = 0; i < runnerThreads; i++) {
addRunnerThread();
}
if (interruptRunawaySplitsTimeout != null) {
long interval = (long) interruptSplitInterval.getValue(SECONDS);
splitMonitorExecutor.scheduleAtFixedRate(this::interruptRunawaySplits, interval, interval, SECONDS);
}
}
@PreDestroy
public synchronized void stop()
{
closed = true;
executor.shutdownNow();
splitMonitorExecutor.shutdownNow();
}
@Override
public synchronized String toString()
{
return toStringHelper(this)
.add("runnerThreads", runnerThreads)
.add("allSplits", allSplits.size())
.add("intermediateSplits", intermediateSplits.size())
.add("waitingSplits", waitingSplits.size())
.add("runningSplits", runningSplits.size())
.add("blockedSplits", blockedSplits.size())
.toString();
}
private synchronized void addRunnerThread()
{
try {
executor.execute(embedVersion.embedVersion(new TaskRunner()));
}
catch (RejectedExecutionException ignored) {
}
}
public synchronized TaskHandle addTask(
TaskId taskId,
DoubleSupplier utilizationSupplier,
int initialSplitConcurrency,
Duration splitConcurrencyAdjustFrequency,
OptionalInt maxDriversPerTask)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(utilizationSupplier, "utilizationSupplier is null");
checkArgument(!maxDriversPerTask.isPresent() || maxDriversPerTask.getAsInt() <= maximumNumberOfDriversPerTask,
"maxDriversPerTask cannot be greater than the configured value");
log.debug("Task scheduled %s", taskId);
TaskHandle taskHandle = new TaskHandle(
taskId,
taskPriorityTrackerFactory.apply(taskId.getQueryId()),
utilizationSupplier,
initialSplitConcurrency,
splitConcurrencyAdjustFrequency,
maxDriversPerTask);
tasks.add(taskHandle);
return taskHandle;
}
public void removeTask(TaskHandle taskHandle)
{
try (SetThreadName ignored = new SetThreadName("Task-%s", taskHandle.getTaskId())) {
doRemoveTask(taskHandle);
}
// replace blocked splits that were terminated
addNewEntrants();
}
private void doRemoveTask(TaskHandle taskHandle)
{
List<PrioritizedSplitRunner> splits;
synchronized (this) {
tasks.remove(taskHandle);
splits = taskHandle.destroy();
// stop tracking splits (especially blocked splits which may never unblock)
allSplits.removeAll(splits);
intermediateSplits.removeAll(splits);
blockedSplits.keySet().removeAll(splits);
waitingSplits.removeAll(splits);
}
// call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor
for (PrioritizedSplitRunner split : splits) {
split.destroy();
}
// record completed stats
long threadUsageNanos = taskHandle.getScheduledNanos();
completedTasksPerLevel.incrementAndGet(computeLevel(threadUsageNanos));
log.debug("Task finished or failed %s", taskHandle.getTaskId());
}
public List<ListenableFuture<?>> enqueueSplits(TaskHandle taskHandle, boolean intermediate, List<? extends SplitRunner> taskSplits)
{
List<PrioritizedSplitRunner> splitsToDestroy = new ArrayList<>();
List<ListenableFuture<?>> finishedFutures = new ArrayList<>(taskSplits.size());
synchronized (this) {
for (SplitRunner taskSplit : taskSplits) {
PrioritizedSplitRunner prioritizedSplitRunner = new PrioritizedSplitRunner(
taskHandle,
taskSplit,
ticker,
globalCpuTimeMicros,
globalScheduledTimeMicros,
blockedQuantaWallTime,
unblockedQuantaWallTime);
if (intermediate) {
// add the runner to the handle so it can be destroyed if the task is canceled
if (taskHandle.recordIntermediateSplit(prioritizedSplitRunner)) {
// Note: we do not record queued time for intermediate splits
startIntermediateSplit(prioritizedSplitRunner);
}
else {
splitsToDestroy.add(prioritizedSplitRunner);
}
}
else {
// add this to the work queue for the task
if (taskHandle.enqueueSplit(prioritizedSplitRunner)) {
// if task is under the limit for guaranteed splits, start one
scheduleTaskIfNecessary(taskHandle);
// if globally we have more resources, start more
addNewEntrants();
}
else {
splitsToDestroy.add(prioritizedSplitRunner);
}
}
finishedFutures.add(prioritizedSplitRunner.getFinishedFuture());
}
}
for (PrioritizedSplitRunner split : splitsToDestroy) {
split.destroy();
}
return finishedFutures;
}
private void splitFinished(PrioritizedSplitRunner split)
{
completedSplitsPerLevel.incrementAndGet(split.getPriority().getLevel());
synchronized (this) {
allSplits.remove(split);
long wallNanos = System.nanoTime() - split.getCreatedNanos();
splitWallTime.add(Duration.succinctNanos(wallNanos));
if (intermediateSplits.remove(split)) {
intermediateSplitWallTime.add(wallNanos);
intermediateSplitScheduledTime.add(split.getScheduledNanos());
intermediateSplitWaitTime.add(split.getWaitNanos());
intermediateSplitCpuTime.add(split.getCpuTimeNanos());
}
else {
leafSplitWallTime.add(wallNanos);
leafSplitScheduledTime.add(split.getScheduledNanos());
leafSplitWaitTime.add(split.getWaitNanos());
leafSplitCpuTime.add(split.getCpuTimeNanos());
}
TaskHandle taskHandle = split.getTaskHandle();
taskHandle.splitComplete(split);
scheduleTaskIfNecessary(taskHandle);
addNewEntrants();
}
// call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor
split.destroy();
}
private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle)
{
// Worker skip processing split if jvm heap usage crosses configured threshold
// Helps reduce memory pressure on the worker and avoid OOMs
if (isLowMemory()) {
log.debug("Skip task scheduling due to low memory");
splitSkippedDueToMemoryPressure.update(1);
return;
}
// if task has less than the minimum guaranteed splits running,
// immediately schedule a new split for this task. This assures
// that a task gets its fair amount of consideration (you have to
// have splits to be considered for running on a thread).
if (taskHandle.getRunningLeafSplits() < min(guaranteedNumberOfDriversPerTask, taskHandle.getMaxDriversPerTask().orElse(Integer.MAX_VALUE))) {
PrioritizedSplitRunner split = taskHandle.pollNextSplit();
if (split != null) {
startSplit(split);
splitQueuedTime.add(Duration.nanosSince(split.getCreatedNanos()));
}
}
}
private synchronized void addNewEntrants()
{
// Worker skip processing split if jvm heap usage crosses configured threshold
// Helps reduce memory pressure on the worker and avoid OOMs
if (isLowMemory()) {
log.debug("Skip polling next split worker due to low memory");
splitSkippedDueToMemoryPressure.update(1);
return;
}
// Ignore intermediate splits when checking minimumNumberOfDrivers.
// Otherwise with (for example) minimumNumberOfDrivers = 100, 200 intermediate splits
// and 100 leaf splits, depending on order of appearing splits, number of
// simultaneously running splits may vary. If leaf splits start first, there will
// be 300 running splits. If intermediate splits start first, there will be only
// 200 running splits.
int running = allSplits.size() - intermediateSplits.size();
for (int i = 0; i < minimumNumberOfDrivers - running; i++) {
PrioritizedSplitRunner split = pollNextSplitWorker();
if (split == null) {
break;
}
splitQueuedTime.add(Duration.nanosSince(split.getCreatedNanos()));
startSplit(split);
}
}
private synchronized void startIntermediateSplit(PrioritizedSplitRunner split)
{
startSplit(split);
intermediateSplits.add(split);
}
private synchronized void startSplit(PrioritizedSplitRunner split)
{
allSplits.add(split);
waitingSplits.offer(split);
}
private synchronized PrioritizedSplitRunner pollNextSplitWorker()
{
// todo find a better algorithm for this
// find the first task that produces a split, then move that task to the
// end of the task list, so we get round robin
for (Iterator<TaskHandle> iterator = tasks.iterator(); iterator.hasNext(); ) {
TaskHandle task = iterator.next();
// skip tasks that are already running the configured max number of drivers
if (task.getRunningLeafSplits() >= task.getMaxDriversPerTask().orElse(maximumNumberOfDriversPerTask)) {
continue;
}
PrioritizedSplitRunner split = task.pollNextSplit();
if (split != null) {
// move task to end of list
iterator.remove();
// CAUTION: we are modifying the list in the loop which would normally
// cause a ConcurrentModificationException but we exit immediately
tasks.add(task);
return split;
}
}
return null;
}
private void interruptRunawaySplits()
{
for (RunningSplitInfo splitInfo : runningSplitInfos) {
Duration duration = Duration.succinctNanos(ticker.read() - splitInfo.getStartTime());
if (duration.compareTo(interruptRunawaySplitsTimeout) < 0) {
continue;
}
List<StackTraceElement> stack = asList(splitInfo.getThread().getStackTrace());
if (interruptibleSplitPredicate.test(stack)) {
String stackString = stack.stream()
.map(Object::toString)
.collect(joining(lineSeparator()));
log.warn("Interrupting runaway split " + splitInfo.getSplitInfo() + lineSeparator() + stackString);
splitInfo.getThread().interrupt();
}
}
}
private class TaskRunner
implements Runnable
{
private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
while (!closed && !Thread.currentThread().isInterrupted()) {
// select next worker
final PrioritizedSplitRunner split;
try {
split = waitingSplits.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread(), split);
runningSplitInfos.add(splitInfo);
runningSplits.add(split);
ListenableFuture<?> blocked;
try {
blocked = split.process();
}
finally {
runningSplitInfos.remove(splitInfo);
runningSplits.remove(split);
}
if (split.isFinished()) {
// Avoid calling split.getInfo() when debug logging is not enabled
if (log.isDebugEnabled()) {
log.debug("%s is finished", split.getInfo());
}
splitFinished(split);
}
else {
if (blocked.isDone()) {
waitingSplits.offer(split);
}
else {
blockedSplits.put(split, blocked);
blocked.addListener(() -> {
blockedSplits.remove(split);
// reset the level priority to prevent previously-blocked splits from starving existing splits
split.resetLevelPriority();
waitingSplits.offer(split);
}, executor);
}
}
}
catch (Throwable t) {
// ignore random errors due to driver thread interruption
if (!split.isDestroyed()) {
if (t instanceof PrestoException) {
PrestoException e = (PrestoException) t;
log.error("Error processing %s: %s: %s", split.getInfo(), e.getErrorCode().getName(), e.getMessage());
}
else {
log.error(t, "Error processing %s", split.getInfo());
}
}
splitFinished(split);
}
finally {
// Clear the interrupted flag on the current thread, driver cancellation may have triggered an interrupt
// without TaskExecutor being shut down
if (Thread.interrupted()) {
if (closed) {
// reset interrupted flag if TaskExecutor was closed, since that interrupt may have been the
// shutdown signal to this TaskRunner thread
Thread.currentThread().interrupt();
}
}
}
}
}
finally {
// unless we have been closed, we need to replace this thread
if (!closed) {
addRunnerThread();
}
}
}
}
//
// STATS
//
@Managed
public synchronized int getTasks()
{
return tasks.size();
}
@Managed
public int getRunnerThreads()
{
return runnerThreads;
}
@Managed
public int getMinimumNumberOfDrivers()
{
return minimumNumberOfDrivers;
}
@Managed
public synchronized int getTotalSplits()
{
return allSplits.size();
}
@Managed
public synchronized int getIntermediateSplits()
{
return intermediateSplits.size();
}
@Managed
public int getWaitingSplits()
{
return waitingSplits.size();
}
@Managed
public int getRunningSplits()
{
return runningSplits.size();
}
@Managed
public int getBlockedSplits()
{
return blockedSplits.size();
}
@Managed
public long getCompletedTasksLevel0()
{
return completedTasksPerLevel.get(0);
}
@Managed
public long getCompletedTasksLevel1()
{
return completedTasksPerLevel.get(1);
}
@Managed
public long getCompletedTasksLevel2()
{
return completedTasksPerLevel.get(2);
}
@Managed
public long getCompletedTasksLevel3()
{
return completedTasksPerLevel.get(3);
}
@Managed
public long getCompletedTasksLevel4()
{
return completedTasksPerLevel.get(4);
}
@Managed
public long getCompletedSplitsLevel0()
{
return completedSplitsPerLevel.get(0);
}
@Managed
public long getCompletedSplitsLevel1()
{
return completedSplitsPerLevel.get(1);
}
@Managed
public long getCompletedSplitsLevel2()
{
return completedSplitsPerLevel.get(2);
}
@Managed
public long getCompletedSplitsLevel3()
{
return completedSplitsPerLevel.get(3);
}
@Managed
public long getCompletedSplitsLevel4()
{
return completedSplitsPerLevel.get(4);
}
@Managed
public long getRunningTasksLevel0()
{
return getRunningTasksForLevel(0);
}
@Managed
public long getRunningTasksLevel1()
{
return getRunningTasksForLevel(1);
}
@Managed
public long getRunningTasksLevel2()
{
return getRunningTasksForLevel(2);
}
@Managed
public long getRunningTasksLevel3()
{
return getRunningTasksForLevel(3);
}
@Managed
public long getRunningTasksLevel4()
{
return getRunningTasksForLevel(4);
}
@Managed
@Nested
public TimeStat getSplitQueuedTime()
{
return splitQueuedTime;
}
@Managed
@Nested
public TimeStat getSplitWallTime()
{
return splitWallTime;
}
@Managed
@Nested
public TimeStat getBlockedQuantaWallTime()
{
return blockedQuantaWallTime;
}
@Managed
@Nested
public TimeStat getUnblockedQuantaWallTime()
{
return unblockedQuantaWallTime;
}
@Managed
@Nested
public TimeDistribution getLeafSplitScheduledTime()
{
return leafSplitScheduledTime;
}
@Managed
@Nested
public TimeDistribution getIntermediateSplitScheduledTime()
{
return intermediateSplitScheduledTime;
}
@Managed
@Nested
public TimeDistribution getLeafSplitWallTime()
{
return leafSplitWallTime;
}
@Managed
@Nested
public TimeDistribution getIntermediateSplitWallTime()
{
return intermediateSplitWallTime;
}
@Managed
@Nested
public TimeDistribution getLeafSplitWaitTime()
{
return leafSplitWaitTime;
}
@Managed
@Nested
public TimeDistribution getIntermediateSplitWaitTime()
{
return intermediateSplitWaitTime;
}
@Managed
@Nested
public TimeDistribution getLeafSplitCpuTime()
{
return leafSplitCpuTime;
}
@Managed
@Nested
public TimeDistribution getIntermediateSplitCpuTime()
{
return intermediateSplitCpuTime;
}
@Managed
@Nested
public CounterStat getGlobalScheduledTimeMicros()
{
return globalScheduledTimeMicros;
}
@Managed
@Nested
public CounterStat getGlobalCpuTimeMicros()
{
return globalCpuTimeMicros;
}
@Managed
@Nested
public CounterStat getSplitSkippedDueToMemoryPressure()
{
return splitSkippedDueToMemoryPressure;
}
private synchronized int getRunningTasksForLevel(int level)
{
int count = 0;
for (TaskHandle task : tasks) {
if (task.getPriority().getLevel() == level) {
count++;
}
}
return count;
}
public String getMaxActiveSplitsInfo()
{
// Sample output:
//
// 2 splits have been continuously active for more than 600.00ms seconds
//
// "20180907_054754_00000_88xi4.1.0-2" tid=99 status=running detail=Split 20210429_154412_56509_f63i2.27.0.22-0 {path=hdfs://ns-prod/user/hive/freight_analytics.db/analytics_loads/000002_0, start=67108864, length=67108864, fileSize=531637256, hosts=[], database=freight_analytics, table=analytics_loads, forceLocalScheduling=false, partitionName=<UNPARTITIONED>, s3SelectPushdownEnabled=false} (start = 2.3639101134349113E10, wall = 34 ms, cpu = 0 ms, wait = 189 ms, calls = 1)
// at java.util.Formatter$FormatSpecifier.<init>(Formatter.java:2708)
// at java.util.Formatter.parse(Formatter.java:2560)
// at java.util.Formatter.format(Formatter.java:2501)
// at ... (more lines of stacktrace)
//
// "20180907_054754_00000_88xi4.1.0-3" tid=106 status=running detail=Error processing Split 20210423_234231_69820_3jigi.2.0.19-43 (start = 2.3149319001729774E10, wall = 143673 ms, cpu = 51060 ms, wait = 34701 ms, calls = 70)
// at java.util.Formatter$FormatSpecifier.<init>(Formatter.java:2709)
// at java.util.Formatter.parse(Formatter.java:2560)
// at java.util.Formatter.format(Formatter.java:2501)
// at ... (more line of stacktrace)
StringBuilder stackTrace = new StringBuilder();
int maxActiveSplitCount = 0;
String message = "%s splits have been continuously active for more than %s seconds\n";
for (RunningSplitInfo splitInfo : runningSplitInfos) {
Duration duration = Duration.succinctNanos(ticker.read() - splitInfo.getStartTime());
if (duration.compareTo(LONG_SPLIT_WARNING_THRESHOLD) >= 0) {
maxActiveSplitCount++;
stackTrace.append("\n");
stackTrace.append(String.format("\"%s\" tid=%s status=%s detail=%s", splitInfo.getThreadId(), splitInfo.getThread().getId(), splitInfo.isFinished() ? "finished" : "running", splitInfo.getSplitInfo())).append("\n");
for (StackTraceElement traceElement : splitInfo.getThread().getStackTrace()) {
stackTrace.append("\tat ").append(traceElement).append("\n");
}
}
}
return String.format(message, maxActiveSplitCount, LONG_SPLIT_WARNING_THRESHOLD).concat(stackTrace.toString());
}
@Managed
public long getRunAwaySplitCount()
{
int count = 0;
for (RunningSplitInfo splitInfo : runningSplitInfos) {
Duration duration = Duration.succinctNanos(ticker.read() - splitInfo.getStartTime());
if (duration.compareTo(LONG_SPLIT_WARNING_THRESHOLD) > 0) {
count++;
}
}
return count;
}
private static class RunningSplitInfo
implements Comparable<RunningSplitInfo>
{
private final long startTime;
private final String threadId;
private final Thread thread;
private final PrioritizedSplitRunner split;
private boolean printed;
public RunningSplitInfo(long startTime, String threadId, Thread thread, PrioritizedSplitRunner split)
{
this.startTime = startTime;
this.threadId = threadId;
this.thread = thread;
this.split = split;
this.printed = false;
}
public long getStartTime()
{
return startTime;
}
public String getThreadId()
{
return threadId;
}
public Thread getThread()
{
return thread;
}
public boolean isPrinted()
{
return printed;
}
public void setPrinted()
{
printed = true;
}
public boolean isFinished()
{
return split.isFinished();
}
public String getSplitInfo()
{
return split.getInfo();
}
@Override
public int compareTo(RunningSplitInfo o)
{
return ComparisonChain.start()
.compare(startTime, o.getStartTime())
.compare(threadId, o.getThreadId())
.result();
}
}
@Managed(description = "Task processor executor")
@Nested
public ThreadPoolExecutorMBean getProcessorExecutor()
{
return executorMBean;
}
public void setLowMemory(boolean lowMemory)
{
this.lowMemory = lowMemory;
}
public boolean isLowMemory()
{
return this.lowMemory;
}
}