ReloadingResourceGroupConfigurationManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.resourceGroups.reloading;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.resourceGroups.AbstractResourceConfigurationManager;
import com.facebook.presto.resourceGroups.ManagerSpec;
import com.facebook.presto.resourceGroups.ResourceGroupIdTemplate;
import com.facebook.presto.resourceGroups.ResourceGroupSelector;
import com.facebook.presto.resourceGroups.ResourceGroupSpec;
import com.facebook.presto.resourceGroups.VariableMap;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
import com.facebook.presto.spi.resourceGroups.ResourceGroup;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_INVALID;
import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_UNAVAILABLE;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.Duration.succinctNanos;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class ReloadingResourceGroupConfigurationManager
        extends AbstractResourceConfigurationManager
{
    private static final Logger log = Logger.get(ReloadingResourceGroupConfigurationManager.class);
    private final ConcurrentMap<ResourceGroupId, ResourceGroup> groups = new ConcurrentHashMap<>();
    @GuardedBy("this")
    private Map<ResourceGroupIdTemplate, ResourceGroupSpec> resourceGroupSpecs = new HashMap<>();
    private final ConcurrentMap<ResourceGroupIdTemplate, List<ResourceGroupId>> configuredGroups = new ConcurrentHashMap<>();
    private final AtomicReference<List<ResourceGroupSpec>> rootGroups = new AtomicReference<>(ImmutableList.of());
    private final AtomicReference<List<ResourceGroupSelector>> selectors = new AtomicReference<>();
    private final AtomicReference<Optional<Duration>> cpuQuotaPeriod = new AtomicReference<>(Optional.empty());
    private final ManagerSpecProvider managerSpecProvider;
    private final ScheduledExecutorService configExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("DbResourceGroupConfigurationManager"));
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicLong lastRefresh = new AtomicLong();
    private final Duration maxRefreshInterval;
    private final boolean exactMatchSelectorEnabled;

    private final CounterStat refreshFailures = new CounterStat();

    @Inject
    public ReloadingResourceGroupConfigurationManager(ClusterMemoryPoolManager memoryPoolManager, ReloadingResourceGroupConfig config, ManagerSpecProvider managerSpecProvider)
    {
        super(memoryPoolManager);
        requireNonNull(memoryPoolManager, "memoryPoolManager is null");
        this.maxRefreshInterval = config.getMaxRefreshInterval();
        this.exactMatchSelectorEnabled = config.getExactMatchSelectorEnabled();
        this.managerSpecProvider = requireNonNull(managerSpecProvider, "provider is null");
        load();
    }

    @Override
    protected Optional<Duration> getCpuQuotaPeriod()
    {
        return cpuQuotaPeriod.get();
    }

    @Override
    protected List<ResourceGroupSpec> getRootGroups()
    {
        checkMaxRefreshInterval();

        if (this.selectors.get().isEmpty()) {
            throw new PrestoException(CONFIGURATION_INVALID, "No root groups are configured");
        }

        return rootGroups.get();
    }

    @PreDestroy
    public void destroy()
    {
        configExecutor.shutdownNow();
    }

    @PostConstruct
    public void start()
    {
        if (started.compareAndSet(false, true)) {
            configExecutor.scheduleWithFixedDelay(this::load, 10, 10, TimeUnit.SECONDS);
        }
    }

    @Override
    public void configure(ResourceGroup group, SelectionContext<VariableMap> criteria)
    {
        Map.Entry<ResourceGroupIdTemplate, ResourceGroupSpec> entry = getMatchingSpec(group, criteria);
        if (groups.putIfAbsent(group.getId(), group) == null) {
            // If a new spec replaces the spec returned from getMatchingSpec the group will be reconfigured on the next run of load().
            configuredGroups.computeIfAbsent(entry.getKey(), v -> new LinkedList<>()).add(group.getId());
        }
        synchronized (getRootGroup(group.getId())) {
            configureGroup(group, entry.getValue());
        }
    }

    @Override
    public Optional<SelectionContext<VariableMap>> match(SelectionCriteria criteria)
    {
        checkMaxRefreshInterval();

        if (selectors.get().isEmpty()) {
            throw new PrestoException(CONFIGURATION_INVALID, "No selectors are configured");
        }

        return selectors.get().stream()
                .map(s -> s.match(criteria))
                .filter(Optional::isPresent)
                .map(Optional::get)
                .findFirst();
    }

    @VisibleForTesting
    public List<ResourceGroupSelector> getSelectors()
    {
        checkMaxRefreshInterval();

        if (selectors.get().isEmpty()) {
            throw new PrestoException(CONFIGURATION_INVALID, "No selectors are configured");
        }
        return selectors.get();
    }

    @VisibleForTesting
    public synchronized void load()
    {
        try {
            ManagerSpec managerSpec = managerSpecProvider.getManagerSpec();
            validateRootGroups(managerSpec);

            List<ResourceGroupSpec> rootGroups = managerSpec.getRootGroups();

            Map<ResourceGroupIdTemplate, ResourceGroupSpec> resourceGroupSpecs = new HashMap<>();
            buildResourceGroupSpecsMap(resourceGroupSpecs, rootGroups, Optional.empty());

            Set<ResourceGroupIdTemplate> changedSpecs = new HashSet<>();
            Set<ResourceGroupIdTemplate> deletedSpecs = Sets.difference(this.resourceGroupSpecs.keySet(), resourceGroupSpecs.keySet());

            for (Map.Entry<ResourceGroupIdTemplate, ResourceGroupSpec> entry : resourceGroupSpecs.entrySet()) {
                if (!entry.getValue().sameConfig(this.resourceGroupSpecs.get(entry.getKey()))) {
                    changedSpecs.add(entry.getKey());
                }
            }

            this.resourceGroupSpecs = resourceGroupSpecs;
            this.cpuQuotaPeriod.set(managerSpec.getCpuQuotaPeriod());
            this.rootGroups.set(managerSpec.getRootGroups());
            ImmutableList.Builder<ResourceGroupSelector> selectorsBuilder = ImmutableList.builder();
            if (exactMatchSelectorEnabled) {
                selectorsBuilder.addAll(managerSpecProvider.getExactMatchSelectors());
            }
            selectorsBuilder.addAll(buildSelectors(managerSpec));
            this.selectors.set(selectorsBuilder.build());

            configureChangedGroups(changedSpecs);
            disableDeletedGroups(deletedSpecs);

            if (lastRefresh.get() > 0) {
                for (ResourceGroupIdTemplate deleted : deletedSpecs) {
                    log.info("Resource group spec deleted %s", deleted);
                }
                for (ResourceGroupIdTemplate changed : changedSpecs) {
                    log.info("Resource group spec %s changed to %s", changed, resourceGroupSpecs.get(changed));
                }
            }
            else {
                log.info("Loaded %s selectors and %s resource groups from source", this.selectors.get().size(), this.resourceGroupSpecs.size());
            }

            lastRefresh.set(System.nanoTime());
        }
        catch (Throwable e) {
            refreshFailures.update(1);
            log.error(e, "Error loading configuration from source");
            if (lastRefresh.get() != 0) {
                log.debug("Last successful configuration loading was %s ago", succinctNanos(System.nanoTime() - lastRefresh.get()).toString());
            }
        }
    }

    private synchronized void buildResourceGroupSpecsMap(Map<ResourceGroupIdTemplate, ResourceGroupSpec> resourceGroupSpecs, List<ResourceGroupSpec> childResourceGroups, Optional<ResourceGroupIdTemplate> parentId)
    {
        for (ResourceGroupSpec resourceGroupSpec : childResourceGroups) {
            ResourceGroupIdTemplate childId;
            if (parentId.isPresent()) {
                childId = ResourceGroupIdTemplate.forSubGroupNamed(parentId.get(), resourceGroupSpec.getName().toString());
            }
            else {
                childId = new ResourceGroupIdTemplate(resourceGroupSpec.getName().toString());
            }

            if (!resourceGroupSpec.getSubGroups().isEmpty()) {
                buildResourceGroupSpecsMap(resourceGroupSpecs, resourceGroupSpec.getSubGroups(), Optional.of(childId));
            }
            resourceGroupSpecs.put(childId, resourceGroupSpec);
        }
    }

    private synchronized void configureChangedGroups(Set<ResourceGroupIdTemplate> changedSpecs)
    {
        for (ResourceGroupIdTemplate resourceGroupIdTemplate : changedSpecs) {
            for (ResourceGroupId resourceGroupId : configuredGroups.getOrDefault(resourceGroupIdTemplate, ImmutableList.of())) {
                synchronized (getRootGroup(resourceGroupId)) {
                    configureGroup(groups.get(resourceGroupId), resourceGroupSpecs.get(resourceGroupIdTemplate));
                }
            }
        }
    }

    private synchronized void disableDeletedGroups(Set<ResourceGroupIdTemplate> deletedSpecs)
    {
        for (ResourceGroupIdTemplate resourceGroupIdTemplate : deletedSpecs) {
            for (ResourceGroupId resourceGroupId : configuredGroups.getOrDefault(resourceGroupIdTemplate, ImmutableList.of())) {
                disableGroup(groups.get(resourceGroupId));
            }
        }
    }

    private synchronized void disableGroup(ResourceGroup group)
    {
        // Disable groups that are removed from the source
        group.setHardConcurrencyLimit(0);
        group.setMaxQueuedQueries(0);
    }

    private ResourceGroup getRootGroup(ResourceGroupId groupId)
    {
        ResourceGroupId current = groupId;
        Optional<ResourceGroupId> parent = current.getParent();
        while (parent.isPresent()) {
            current = parent.get();
            parent = current.getParent();
        }

        ResourceGroup rootResourceGroup = groups.get(current);
        checkState(rootResourceGroup != null, "%s is missing in the groups map for groupId %s", current, groupId);
        return rootResourceGroup;
    }

    private void checkMaxRefreshInterval()
    {
        if (System.nanoTime() - lastRefresh.get() > maxRefreshInterval.toMillis() * MILLISECONDS.toNanos(1)) {
            String message = "Resource group configuration cannot be fetched from source.";
            if (lastRefresh.get() != 0) {
                message += format(" Current resource group configuration is loaded %s ago", succinctNanos(System.nanoTime() - lastRefresh.get()).toString());
            }

            throw new PrestoException(CONFIGURATION_UNAVAILABLE, message);
        }
    }

    @Managed
    @Nested
    public CounterStat getRefreshFailures()
    {
        return refreshFailures;
    }
}