InternalResourceGroup.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution.resourceGroups;

import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.execution.ManagedQueryExecution;
import com.facebook.presto.execution.resourceGroups.WeightedFairQueue.Usage;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.QueryStateInfo;
import com.facebook.presto.server.ResourceGroupInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.resourceGroups.ResourceGroup;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.facebook.presto.spi.resourceGroups.ResourceGroupState;
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;

import static com.facebook.presto.SystemSessionProperties.getQueryPriority;
import static com.facebook.presto.common.ErrorType.USER_ERROR;
import static com.facebook.presto.server.QueryStateInfo.createQueryStateInfo;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_RESOURCE_GROUP;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits.NO_LIMITS;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_QUEUE;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_RUN;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.FULL;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.FAIR;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.QUERY_PRIORITY;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.WEIGHTED;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.WEIGHTED_FAIR;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.math.LongMath.saturatedAdd;
import static com.google.common.math.LongMath.saturatedMultiply;
import static com.google.common.math.LongMath.saturatedSubtract;
import static io.airlift.units.DataSize.Unit.BYTE;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
 * Resource groups form a tree, and all access to a group is guarded by the root of the tree.
 * Queries are submitted to leaf groups. Never to intermediate groups. Intermediate groups
 * aggregate resource consumption from their children, and may have their own limitations that
 * are enforced.
 */
@ThreadSafe
public class InternalResourceGroup
        implements ResourceGroup
{
    public static final int DEFAULT_WEIGHT = 1;

    private final InternalResourceGroup root;
    private final Optional<InternalResourceGroup> parent;
    private final ResourceGroupId id;
    private final BiConsumer<InternalResourceGroup, Boolean> jmxExportListener;
    private final Executor executor;
    private final boolean staticResourceGroup;
    private final Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo;
    private final Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate;
    private final InternalNodeManager nodeManager;

    // Configuration
    // =============
    @GuardedBy("root")
    private long softMemoryLimitBytes = Long.MAX_VALUE;
    @GuardedBy("root")
    private int softConcurrencyLimit;
    @GuardedBy("root")
    private int workersPerQueryLimit;
    @GuardedBy("root")
    private int hardConcurrencyLimit;
    @GuardedBy("root")
    private int maxQueuedQueries;
    @GuardedBy("root")
    private long softCpuLimitMillis = Long.MAX_VALUE;
    @GuardedBy("root")
    private long hardCpuLimitMillis = Long.MAX_VALUE;
    @GuardedBy("root")
    private long cpuQuotaGenerationMillisPerSecond = Long.MAX_VALUE;
    @GuardedBy("root")
    private int schedulingWeight = DEFAULT_WEIGHT;
    @GuardedBy("root")
    private SchedulingPolicy schedulingPolicy = FAIR;
    @GuardedBy("root")
    private boolean jmxExport;
    @GuardedBy("root")
    private ResourceGroupQueryLimits perQueryLimits = NO_LIMITS;

    // Live data structures
    // ====================
    @GuardedBy("root")
    private final Map<String, InternalResourceGroup> subGroups = new HashMap<>();
    // Sub groups with queued queries, that have capacity to run them
    // That is, they must return true when internalStartNext() is called on them
    @GuardedBy("root")
    private Queue<InternalResourceGroup> eligibleSubGroups = new FifoQueue<>();
    // Sub groups whose memory usage may be out of date. Most likely because they have a running query.
    @GuardedBy("root")
    private final Set<InternalResourceGroup> dirtySubGroups = new HashSet<>();
    @GuardedBy("root")
    private TieredQueue<ManagedQueryExecution> queuedQueries = new TieredQueue<>(FifoQueue::new);
    @GuardedBy("root")
    private final Set<ManagedQueryExecution> runningQueries = new HashSet<>();
    @GuardedBy("root")
    private int descendantRunningQueries;
    @GuardedBy("root")
    private int descendantQueuedQueries;
    // Memory usage is cached because it changes very rapidly while queries are running, and would be expensive to track continuously
    @GuardedBy("root")
    private long cachedMemoryUsageBytes;
    @GuardedBy("root")
    private long cpuUsageMillis;
    @GuardedBy("root")
    private long lastStartMillis;
    @GuardedBy("root")
    private final CounterStat timeBetweenStartsSec = new CounterStat();

    @GuardedBy("root")
    private AtomicLong lastRunningQueryStartTime = new AtomicLong(currentTimeMillis());
    @GuardedBy("root")
    private AtomicBoolean isDirty = new AtomicBoolean();

    protected InternalResourceGroup(
            Optional<InternalResourceGroup> parent,
            String name,
            BiConsumer<InternalResourceGroup, Boolean> jmxExportListener,
            Executor executor,
            boolean staticResourceGroup,
            Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
            Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
            InternalNodeManager nodeManager)
    {
        this.parent = requireNonNull(parent, "parent is null");
        this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null");
        this.executor = requireNonNull(executor, "executor is null");
        this.nodeManager = requireNonNull(nodeManager, "node manager is null");
        requireNonNull(name, "name is null");
        if (parent.isPresent()) {
            id = new ResourceGroupId(parent.get().id, name);
            root = parent.get().root;
        }
        else {
            id = new ResourceGroupId(name);
            root = this;
        }
        this.staticResourceGroup = staticResourceGroup;
        this.additionalRuntimeInfo = requireNonNull(additionalRuntimeInfo, "additionalRuntimeInfo is null");
        this.shouldWaitForResourceManagerUpdate = requireNonNull(shouldWaitForResourceManagerUpdate, "shouldWaitForResourceManagerUpdate is null");
    }

    public ResourceGroupInfo getResourceGroupInfo(boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly)
    {
        synchronized (root) {
            return new ResourceGroupInfo(
                    id,
                    getState(),
                    schedulingPolicy,
                    schedulingWeight,
                    DataSize.succinctBytes(softMemoryLimitBytes),
                    softConcurrencyLimit,
                    hardConcurrencyLimit,
                    maxQueuedQueries,
                    DataSize.succinctBytes(cachedMemoryUsageBytes),
                    getQueuedQueries(),
                    getRunningQueries(),
                    eligibleSubGroups.size(),
                    subGroups.values().stream()
                            .filter(group -> group.getRunningQueries() + group.getQueuedQueries() > 0)
                            .filter(group -> !includeStaticSubgroupsOnly || group.isStaticResourceGroup())
                            .map(group -> summarizeSubgroups ? group.getSummaryInfo() : group.getResourceGroupInfo(includeQueryInfo, false, includeStaticSubgroupsOnly))
                            .collect(toImmutableList()),
                    includeQueryInfo ? getAggregatedRunningQueriesInfo() : null,
                    workersPerQueryLimit);
        }
    }

    public ResourceGroupInfo getInfo()
    {
        synchronized (root) {
            return new ResourceGroupInfo(
                    id,
                    getState(),
                    schedulingPolicy,
                    schedulingWeight,
                    DataSize.succinctBytes(softMemoryLimitBytes),
                    softConcurrencyLimit,
                    hardConcurrencyLimit,
                    maxQueuedQueries,
                    DataSize.succinctBytes(cachedMemoryUsageBytes),
                    getQueuedQueries(),
                    getRunningQueries(),
                    eligibleSubGroups.size(),
                    subGroups.values().stream()
                            .filter(group -> group.getRunningQueries() + group.getQueuedQueries() > 0)
                            .map(InternalResourceGroup::getSummaryInfo)
                            .collect(toImmutableList()),
                    null,
                    workersPerQueryLimit);
        }
    }

    private ResourceGroupInfo getSummaryInfo()
    {
        synchronized (root) {
            return new ResourceGroupInfo(
                    id,
                    getState(),
                    schedulingPolicy,
                    schedulingWeight,
                    DataSize.succinctBytes(softMemoryLimitBytes),
                    softConcurrencyLimit,
                    hardConcurrencyLimit,
                    maxQueuedQueries,
                    DataSize.succinctBytes(cachedMemoryUsageBytes),
                    getQueuedQueries(),
                    getRunningQueries(),
                    eligibleSubGroups.size(),
                    null,
                    null,
                    workersPerQueryLimit);
        }
    }

    boolean isStaticResourceGroup()
    {
        return staticResourceGroup;
    }

    private ResourceGroupState getState()
    {
        synchronized (root) {
            if (canRunMore()) {
                return CAN_RUN;
            }
            else if (canQueueMore()) {
                return CAN_QUEUE;
            }
            else {
                return FULL;
            }
        }
    }

    private List<QueryStateInfo> getAggregatedRunningQueriesInfo()
    {
        synchronized (root) {
            if (subGroups.isEmpty()) {
                return runningQueries.stream()
                        .map(ManagedQueryExecution::getBasicQueryInfo)
                        .map(queryInfo -> createQueryStateInfo(queryInfo))
                        .collect(toImmutableList());
            }

            return subGroups.values().stream()
                    .map(InternalResourceGroup::getAggregatedRunningQueriesInfo)
                    .flatMap(List::stream)
                    .collect(toImmutableList());
        }
    }

    public List<ResourceGroupInfo> getPathToRoot()
    {
        synchronized (root) {
            ImmutableList.Builder<ResourceGroupInfo> builder = ImmutableList.builder();
            InternalResourceGroup group = this;
            while (group != null) {
                builder.add(group.getInfo());
                group = group.parent.orElse(null);
            }

            return builder.build();
        }
    }

    @Override
    public ResourceGroupId getId()
    {
        return id;
    }

    @Managed
    public int getRunningQueries()
    {
        synchronized (root) {
            return runningQueries.size() + descendantRunningQueries;
        }
    }

    private int getAggregatedRunningQueries()
    {
        synchronized (root) {
            int aggregatedRunningQueries = runningQueries.size() + descendantRunningQueries;
            Optional<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo = getAdditionalRuntimeInfo();
            if (resourceGroupRuntimeInfo.isPresent()) {
                aggregatedRunningQueries += resourceGroupRuntimeInfo.get().getRunningQueries() + resourceGroupRuntimeInfo.get().getDescendantRunningQueries();
            }
            return aggregatedRunningQueries;
        }
    }

    @Managed
    public int getQueuedQueries()
    {
        synchronized (root) {
            return queuedQueries.size() + descendantQueuedQueries;
        }
    }

    @Managed
    public int getWaitingQueuedQueries()
    {
        synchronized (root) {
            // For leaf group, when no queries can run, all queued queries are waiting for resources on this resource group.
            if (subGroups.isEmpty()) {
                return queuedQueries.size();
            }

            // For internal groups, when no queries can run, only queries that could run on its subgroups are waiting for resources on this group.
            int waitingQueuedQueries = 0;
            for (InternalResourceGroup subGroup : subGroups.values()) {
                if (subGroup.canRunMore()) {
                    waitingQueuedQueries += min(subGroup.getQueuedQueries(), subGroup.getHardConcurrencyLimit() - subGroup.getRunningQueries());
                }
            }

            return waitingQueuedQueries;
        }
    }

    @Override
    public DataSize getSoftMemoryLimit()
    {
        synchronized (root) {
            return new DataSize(softMemoryLimitBytes, BYTE);
        }
    }

    @Override
    public void setSoftMemoryLimit(DataSize limit)
    {
        synchronized (root) {
            boolean oldCanRun = canRunMore();
            this.softMemoryLimitBytes = limit.toBytes();
            if (canRunMore() != oldCanRun) {
                updateEligibility();
            }
        }
    }

    @Override
    public Duration getSoftCpuLimit()
    {
        synchronized (root) {
            return new Duration(softCpuLimitMillis, MILLISECONDS);
        }
    }

    @Override
    public void setSoftCpuLimit(Duration limit)
    {
        synchronized (root) {
            if (limit.toMillis() > hardCpuLimitMillis) {
                setHardCpuLimit(limit);
            }
            boolean oldCanRun = canRunMore();
            this.softCpuLimitMillis = limit.toMillis();
            if (canRunMore() != oldCanRun) {
                updateEligibility();
            }
        }
    }

    @Override
    public Duration getHardCpuLimit()
    {
        synchronized (root) {
            return new Duration(hardCpuLimitMillis, MILLISECONDS);
        }
    }

    @Override
    public void setHardCpuLimit(Duration limit)
    {
        synchronized (root) {
            if (limit.toMillis() < softCpuLimitMillis) {
                setSoftCpuLimit(limit);
            }
            boolean oldCanRun = canRunMore();
            this.hardCpuLimitMillis = limit.toMillis();
            if (canRunMore() != oldCanRun) {
                updateEligibility();
            }
        }
    }

    @Override
    public long getCpuQuotaGenerationMillisPerSecond()
    {
        synchronized (root) {
            return cpuQuotaGenerationMillisPerSecond;
        }
    }

    @Override
    public void setCpuQuotaGenerationMillisPerSecond(long rate)
    {
        checkArgument(rate > 0, "Cpu quota generation must be positive");
        synchronized (root) {
            cpuQuotaGenerationMillisPerSecond = rate;
        }
    }

    @Override
    public int getWorkersPerQueryLimit()
    {
        synchronized (root) {
            return workersPerQueryLimit;
        }
    }

    @Override
    public void setWorkersPerQueryLimit(int workersPerQueryLimit)
    {
        checkArgument(workersPerQueryLimit >= 0, "workersPerQueryLimit is negative");
        synchronized (root) {
            boolean oldCanRun = canRunMore();
            this.workersPerQueryLimit = workersPerQueryLimit;
            if (canRunMore() != oldCanRun) {
                updateEligibility();
            }
        }
    }

    @Override
    public int getSoftConcurrencyLimit()
    {
        synchronized (root) {
            return softConcurrencyLimit;
        }
    }

    @Override
    public void setSoftConcurrencyLimit(int softConcurrencyLimit)
    {
        checkArgument(softConcurrencyLimit >= 0, "softConcurrencyLimit is negative");
        synchronized (root) {
            boolean oldCanRun = canRunMore();
            this.softConcurrencyLimit = softConcurrencyLimit;
            if (canRunMore() != oldCanRun) {
                updateEligibility();
            }
        }
    }

    @Managed
    @Override
    public int getHardConcurrencyLimit()
    {
        synchronized (root) {
            return hardConcurrencyLimit;
        }
    }

    @Managed
    @Override
    public void setHardConcurrencyLimit(int hardConcurrencyLimit)
    {
        checkArgument(hardConcurrencyLimit >= 0, "hardConcurrencyLimit is negative");
        synchronized (root) {
            boolean oldCanRun = canRunMore();
            this.hardConcurrencyLimit = hardConcurrencyLimit;
            if (canRunMore() != oldCanRun) {
                updateEligibility();
            }
        }
    }

    @Managed
    @Override
    public int getMaxQueuedQueries()
    {
        synchronized (root) {
            return maxQueuedQueries;
        }
    }

    @Managed
    @Override
    public void setMaxQueuedQueries(int maxQueuedQueries)
    {
        checkArgument(maxQueuedQueries >= 0, "maxQueuedQueries is negative");
        synchronized (root) {
            this.maxQueuedQueries = maxQueuedQueries;
        }
    }

    @Managed
    @Nested
    public CounterStat getTimeBetweenStartsSec()
    {
        return timeBetweenStartsSec;
    }

    @Override
    public int getSchedulingWeight()
    {
        synchronized (root) {
            return schedulingWeight;
        }
    }

    @Override
    public void setSchedulingWeight(int weight)
    {
        checkArgument(weight > 0, "weight must be positive");
        synchronized (root) {
            this.schedulingWeight = weight;
            if (parent.isPresent() && parent.get().schedulingPolicy == WEIGHTED && parent.get().eligibleSubGroups.contains(this)) {
                parent.get().addOrUpdateSubGroup(this);
            }
        }
    }

    @Override
    public SchedulingPolicy getSchedulingPolicy()
    {
        synchronized (root) {
            return schedulingPolicy;
        }
    }

    @Override
    public void setSchedulingPolicy(SchedulingPolicy policy)
    {
        synchronized (root) {
            if (policy == schedulingPolicy) {
                return;
            }

            if (parent.isPresent() && parent.get().schedulingPolicy == QUERY_PRIORITY) {
                checkArgument(policy == QUERY_PRIORITY, "Parent of %s uses query priority scheduling, so %s must also", id, id);
            }

            // Switch to the appropriate queue implementation to implement the desired policy
            Queue<InternalResourceGroup> queue;
            TieredQueue<ManagedQueryExecution> queryQueue;
            switch (policy) {
                case FAIR:
                    queue = new FifoQueue<>();
                    queryQueue = new TieredQueue<>(FifoQueue::new);
                    break;
                case WEIGHTED:
                    queue = new StochasticPriorityQueue<>();
                    queryQueue = new TieredQueue<>(StochasticPriorityQueue::new);
                    break;
                case WEIGHTED_FAIR:
                    queue = new WeightedFairQueue<>();
                    queryQueue = new TieredQueue<>(IndexedPriorityQueue::new);
                    break;
                case QUERY_PRIORITY:
                    // Sub groups must use query priority to ensure ordering
                    for (InternalResourceGroup group : subGroups.values()) {
                        group.setSchedulingPolicy(QUERY_PRIORITY);
                    }
                    queue = new IndexedPriorityQueue<>();
                    queryQueue = new TieredQueue<>(IndexedPriorityQueue::new);
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported scheduling policy: " + policy);
            }
            schedulingPolicy = policy;
            while (!eligibleSubGroups.isEmpty()) {
                InternalResourceGroup group = eligibleSubGroups.poll();
                addOrUpdateSubGroup(queue, group);
            }
            eligibleSubGroups = queue;
            while (!queuedQueries.isEmpty()) {
                ManagedQueryExecution query = queuedQueries.poll();
                queryQueue.addOrUpdate(query, getQueryPriority(query.getSession()));
            }
            queuedQueries = queryQueue;
        }
    }

    @Override
    public boolean getJmxExport()
    {
        synchronized (root) {
            return jmxExport;
        }
    }

    @Override
    public void setJmxExport(boolean export)
    {
        synchronized (root) {
            jmxExport = export;
        }
        jmxExportListener.accept(this, export);
    }

    @Override
    public void setPerQueryLimits(ResourceGroupQueryLimits perQueryLimits)
    {
        synchronized (root) {
            this.perQueryLimits = perQueryLimits;
        }
    }

    @Override
    public ResourceGroupQueryLimits getPerQueryLimits()
    {
        synchronized (root) {
            return perQueryLimits;
        }
    }

    public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegment)
    {
        requireNonNull(name, "name is null");
        synchronized (root) {
            checkArgument(runningQueries.isEmpty() && queuedQueries.isEmpty(), "Cannot add sub group to %s while queries are running", id);
            if (subGroups.containsKey(name)) {
                return subGroups.get(name);
            }
            // parent segments size equals to subgroup segment index
            int subGroupSegmentIndex = id.getSegments().size();
            InternalResourceGroup subGroup = new InternalResourceGroup(
                    Optional.of(this),
                    name,
                    jmxExportListener,
                    executor,
                    staticResourceGroup && staticSegment,
                    additionalRuntimeInfo,
                    shouldWaitForResourceManagerUpdate,
                    nodeManager);
            // Sub group must use query priority to ensure ordering
            if (schedulingPolicy == QUERY_PRIORITY) {
                subGroup.setSchedulingPolicy(QUERY_PRIORITY);
            }
            subGroups.put(name, subGroup);
            return subGroup;
        }
    }

    public int getRunningTaskCount()
    {
        if (subGroups().isEmpty()) {
            return runningQueries.stream()
                    .mapToInt(query -> query.getRunningTaskCount())
                    .sum();
        }

        int taskCount = 0;
        for (InternalResourceGroup subGroup : subGroups()) {
            taskCount += subGroup.getRunningTaskCount();
        }

        return taskCount;
    }

    protected void setDirty()
    {
        synchronized (root) {
            this.isDirty.set(true);
            dirtySubGroups.addAll(subGroups());
            subGroups().forEach(InternalResourceGroup::setDirty);
        }
    }

    public void run(ManagedQueryExecution query)
    {
        boolean isQueryQueueFull = false;
        synchronized (root) {
            if (!subGroups.isEmpty()) {
                throw new PrestoException(INVALID_RESOURCE_GROUP, format("Cannot add queries to %s. It is not a leaf group.", id));
            }
            // Check all ancestors for capacity
            InternalResourceGroup group = this;
            boolean canQueue = true;
            boolean canRun = true;
            while (true) {
                canQueue &= group.canQueueMore();
                canRun &= group.canRunMore();
                if (!group.parent.isPresent()) {
                    break;
                }
                group = group.parent.get();
            }
            if (!canQueue && !canRun) {
                isQueryQueueFull = true;
            }
            else {
                query.setResourceGroupQueryLimits(perQueryLimits);
                if (canRun && queuedQueries.isEmpty()) {
                    startInBackground(query);
                }
                else {
                    enqueueQuery(query);
                }
                query.addStateChangeListener(state -> {
                    if (state.isDone()) {
                        queryFinished(query);
                    }
                });
            }
        }
        if (isQueryQueueFull) {
            query.fail(new QueryQueueFullException(id));
        }
    }

    private void enqueueQuery(ManagedQueryExecution query)
    {
        checkState(Thread.holdsLock(root), "Must hold lock to enqueue a query");

        synchronized (root) {
            int priority = getQueryPriority(query.getSession());
            if (query.isRetry()) {
                queuedQueries.prioritize(query, priority);
            }
            else {
                queuedQueries.addOrUpdate(query, priority);
            }
            InternalResourceGroup group = this;
            while (group.parent.isPresent()) {
                group.parent.get().descendantQueuedQueries++;
                group = group.parent.get();
            }
            updateEligibility();
        }
    }

    // This method must be called whenever the group's eligibility to run more queries may have changed.
    private void updateEligibility()
    {
        checkState(Thread.holdsLock(root), "Must hold lock to update eligibility");
        synchronized (root) {
            if (!parent.isPresent()) {
                return;
            }
            if (isEligibleToStartNext()) {
                parent.get().addOrUpdateSubGroup(this);
            }
            else {
                if (queuedQueries.isEmpty() && eligibleSubGroups.isEmpty()) {
                    parent.get().eligibleSubGroups.remove(this);
                    lastStartMillis = 0;
                }
            }
            parent.get().updateEligibility();
        }
    }

    private void startInBackground(ManagedQueryExecution query)
    {
        checkState(Thread.holdsLock(root), "Must hold lock to start a query");
        synchronized (root) {
            runningQueries.add(query);
            InternalResourceGroup group = this;
            while (group.parent.isPresent()) {
                group.parent.get().descendantRunningQueries++;
                group.parent.get().dirtySubGroups.add(group);
                group = group.parent.get();
            }
            updateEligibility();
            executor.execute(query::startWaitingForResources);
            group = this;
            long lastRunningQueryStartTimeMillis = currentTimeMillis();
            lastRunningQueryStartTime.set(lastRunningQueryStartTimeMillis);
            while (group.parent.isPresent()) {
                group.parent.get().lastRunningQueryStartTime.set(lastRunningQueryStartTimeMillis);
                group = group.parent.get();
            }
        }
    }

    private void queryFinished(ManagedQueryExecution query)
    {
        synchronized (root) {
            if (!runningQueries.contains(query) && !queuedQueries.contains(query)) {
                // Query has already been cleaned up
                return;
            }
            // Only count the CPU time if the query succeeded, or the failure was the fault of the user
            if (!query.getErrorCode().isPresent() || query.getErrorCode().get().getType() == USER_ERROR) {
                InternalResourceGroup group = this;
                while (group != null) {
                    group.cpuUsageMillis = saturatedAdd(group.cpuUsageMillis, query.getTotalCpuTime().toMillis());
                    group = group.parent.orElse(null);
                }
            }
            if (runningQueries.contains(query)) {
                runningQueries.remove(query);
                InternalResourceGroup group = this;
                while (group.parent.isPresent()) {
                    group.parent.get().descendantRunningQueries--;
                    group = group.parent.get();
                }
            }
            else {
                queuedQueries.remove(query);
                InternalResourceGroup group = this;
                while (group.parent.isPresent()) {
                    group.parent.get().descendantQueuedQueries--;
                    group = group.parent.get();
                }
            }
            updateEligibility();
        }
    }

    // Memory usage stats are expensive to maintain, so this method must be called periodically to update them
    protected void internalRefreshStats()
    {
        checkState(Thread.holdsLock(root), "Must hold lock to refresh stats");
        synchronized (root) {
            if (subGroups.isEmpty()) {
                cachedMemoryUsageBytes = 0;
                for (ManagedQueryExecution query : runningQueries) {
                    cachedMemoryUsageBytes += query.getUserMemoryReservationInBytes();
                }
                Optional<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo = getAdditionalRuntimeInfo();
                resourceGroupRuntimeInfo.ifPresent(groupRuntimeInfo -> cachedMemoryUsageBytes += groupRuntimeInfo.getMemoryUsageBytes());
            }
            else {
                for (Iterator<InternalResourceGroup> iterator = dirtySubGroups.iterator(); iterator.hasNext(); ) {
                    InternalResourceGroup subGroup = iterator.next();
                    long oldMemoryUsageBytes = subGroup.cachedMemoryUsageBytes;
                    cachedMemoryUsageBytes -= oldMemoryUsageBytes;
                    subGroup.internalRefreshStats();
                    cachedMemoryUsageBytes += subGroup.cachedMemoryUsageBytes;
                    if (!subGroup.isDirty()) {
                        iterator.remove();
                    }
                    if (oldMemoryUsageBytes != subGroup.cachedMemoryUsageBytes || subGroup.isDirty.get()) {
                        subGroup.updateEligibility();
                        subGroup.isDirty.set(false);
                    }
                }
            }
        }
    }

    protected void internalGenerateCpuQuota(long elapsedSeconds)
    {
        checkState(Thread.holdsLock(root), "Must hold lock to generate cpu quota");
        synchronized (root) {
            long newQuota = saturatedMultiply(elapsedSeconds, cpuQuotaGenerationMillisPerSecond);
            cpuUsageMillis = saturatedSubtract(cpuUsageMillis, newQuota);
            if (cpuUsageMillis < 0 || cpuUsageMillis == Long.MAX_VALUE) {
                cpuUsageMillis = 0;
            }
            for (InternalResourceGroup group : subGroups.values()) {
                group.internalGenerateCpuQuota(elapsedSeconds);
            }
        }
    }

    protected boolean internalStartNext()
    {
        checkState(Thread.holdsLock(root), "Must hold lock to find next query");
        synchronized (root) {
            if (!canRunMore()) {
                return false;
            }

            ManagedQueryExecution query = queuedQueries.poll();
            if (query != null) {
                startInBackground(query);
                return true;
            }

            // Remove even if the sub group still has queued queries, so that it goes to the back of the queue
            InternalResourceGroup subGroup = eligibleSubGroups.poll();
            if (subGroup == null) {
                return false;
            }
            boolean started = subGroup.internalStartNext();
            if (started) {
                long currentTime = System.currentTimeMillis();
                if (lastStartMillis != 0) {
                    timeBetweenStartsSec.update(Math.max(0, (currentTime - lastStartMillis) / 1000));
                }
                lastStartMillis = currentTime;

                descendantQueuedQueries--;

                // Don't call updateEligibility here, as we're in a recursive call, and don't want to repeatedly update our ancestors.
                if (subGroup.isEligibleToStartNext()) {
                    addOrUpdateSubGroup(subGroup);
                }
            }
            else {
                //If subGroup not able to start the query, we should add it back.
                addOrUpdateSubGroup(subGroup);
            }

            return started;
        }
    }

    private void addOrUpdateSubGroup(Queue<InternalResourceGroup> queue, InternalResourceGroup group)
    {
        if (schedulingPolicy == WEIGHTED_FAIR) {
            ((WeightedFairQueue<InternalResourceGroup>) queue).addOrUpdate(group, new Usage(group.getSchedulingWeight(), group.getAggregatedRunningQueries()));
        }
        else {
            ((UpdateablePriorityQueue<InternalResourceGroup>) queue).addOrUpdate(group, getSubGroupSchedulingPriority(schedulingPolicy, group));
        }
    }

    private void addOrUpdateSubGroup(InternalResourceGroup group)
    {
        addOrUpdateSubGroup(eligibleSubGroups, group);
    }

    private static long getSubGroupSchedulingPriority(SchedulingPolicy policy, InternalResourceGroup group)
    {
        if (policy == QUERY_PRIORITY) {
            return group.getHighestQueryPriority();
        }
        else {
            return group.computeSchedulingWeight();
        }
    }

    private long computeSchedulingWeight()
    {
        if (getAggregatedRunningQueries() >= softConcurrencyLimit) {
            return schedulingWeight;
        }

        return (long) Integer.MAX_VALUE * schedulingWeight;
    }

    private boolean isDirty()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            return runningQueries.size() + descendantRunningQueries > 0 || isDirty.get();
        }
    }

    private boolean isEligibleToStartNext()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            if (!canRunMore()) {
                return false;
            }
            return !queuedQueries.isEmpty() || !eligibleSubGroups.isEmpty();
        }
    }

    private int getHighestQueryPriority()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            checkState(queuedQueries.getLowPriorityQueue() instanceof IndexedPriorityQueue, "Queued queries not ordered");
            if (queuedQueries.isEmpty()) {
                return 0;
            }
            return getQueryPriority(queuedQueries.peek().getSession());
        }
    }

    private boolean canQueueMore()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            Optional<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo = getAdditionalRuntimeInfo();
            if (resourceGroupRuntimeInfo.isPresent()) {
                return descendantQueuedQueries + queuedQueries.size() + resourceGroupRuntimeInfo.get().getQueuedQueries() + resourceGroupRuntimeInfo.get().getDescendantQueuedQueries() < maxQueuedQueries;
            }
            return descendantQueuedQueries + queuedQueries.size() < maxQueuedQueries;
        }
    }

    private boolean canRunMore()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            if (cpuUsageMillis >= hardCpuLimitMillis) {
                return false;
            }

            if (shouldWaitForResourceManagerUpdate()) {
                return false;
            }

            if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) {
                return false;
            }

            int hardConcurrencyLimit = getHardConcurrencyLimitBasedOnCpuUsage();

            int totalRunningQueries = runningQueries.size() + descendantRunningQueries;

            Optional<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo = getAdditionalRuntimeInfo();
            if (resourceGroupRuntimeInfo.isPresent()) {
                totalRunningQueries += resourceGroupRuntimeInfo.get().getRunningQueries() + resourceGroupRuntimeInfo.get().getDescendantRunningQueries();
            }
            int activeWorkerCount = nodeManager.getAllNodes().getActiveWorkerCount();

            return totalRunningQueries < hardConcurrencyLimit && cachedMemoryUsageBytes <= softMemoryLimitBytes && totalRunningQueries * workersPerQueryLimit <= activeWorkerCount;
        }
    }

    protected int getHardConcurrencyLimitBasedOnCpuUsage()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            int hardConcurrencyLimit = this.hardConcurrencyLimit;
            if (cpuUsageMillis >= softCpuLimitMillis) {
                // TODO: Consider whether cpu limit math should be performed on softConcurrency or hardConcurrency
                // Linear penalty between soft and hard limit
                double penalty = (cpuUsageMillis - softCpuLimitMillis) / (double) (hardCpuLimitMillis - softCpuLimitMillis);
                hardConcurrencyLimit = (int) Math.floor(hardConcurrencyLimit * (1 - penalty));
                // Always penalize by at least one
                hardConcurrencyLimit = min(this.hardConcurrencyLimit - 1, hardConcurrencyLimit);
                // Always allow at least one running query
                hardConcurrencyLimit = Math.max(1, hardConcurrencyLimit);
            }

            return hardConcurrencyLimit;
        }
    }

    public Collection<InternalResourceGroup> subGroups()
    {
        synchronized (root) {
            return subGroups.values();
        }
    }

    protected long getLastRunningQueryStartTime()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            return lastRunningQueryStartTime.get();
        }
    }

    private boolean shouldWaitForResourceManagerUpdate()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            return shouldWaitForResourceManagerUpdate.test(this);
        }
    }

    private Optional<ResourceGroupRuntimeInfo> getAdditionalRuntimeInfo()
    {
        checkState(Thread.holdsLock(root), "Must hold lock");
        synchronized (root) {
            return additionalRuntimeInfo.apply(getId());
        }
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("id", id)
                .toString();
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o) {
            return true;
        }
        if (!(o instanceof InternalResourceGroup)) {
            return false;
        }
        InternalResourceGroup that = (InternalResourceGroup) o;
        return Objects.equals(id, that.id);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(id);
    }

    @ThreadSafe
    public static final class RootInternalResourceGroup
            extends InternalResourceGroup
    {
        private AtomicBoolean taskLimitExceeded = new AtomicBoolean();

        public RootInternalResourceGroup(
                String name,
                BiConsumer<InternalResourceGroup, Boolean> jmxExportListener,
                Executor executor,
                Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
                Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
                InternalNodeManager nodeManager)
        {
            super(Optional.empty(),
                    name,
                    jmxExportListener,
                    executor,
                    true,
                    additionalRuntimeInfo,
                    shouldWaitForResourceManagerUpdate,
                    nodeManager);
        }

        public synchronized void processQueuedQueries()
        {
            internalRefreshStats();

            while (internalStartNext()) {
                // start all the queries we can
            }
        }

        public synchronized void generateCpuQuota(long elapsedSeconds)
        {
            if (elapsedSeconds > 0) {
                internalGenerateCpuQuota(elapsedSeconds);
            }
        }

        public void setTaskLimitExceeded(boolean exceeded)
        {
            taskLimitExceeded.set(exceeded);
        }

        private boolean isTaskLimitExceeded()
        {
            return taskLimitExceeded.get();
        }
    }
}