ClusterMemoryPool.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.memory;

import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo;
import com.facebook.presto.spi.memory.MemoryAllocation;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.memory.MemoryPoolInfo;
import com.google.common.collect.ImmutableMap;
import org.weakref.jmx.Managed;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class ClusterMemoryPool
{
    private final MemoryPoolId id;

    @GuardedBy("this")
    private long totalDistributedBytes;

    @GuardedBy("this")
    private long reservedDistributedBytes;

    @GuardedBy("this")
    private long reservedRevocableDistributedBytes;

    @GuardedBy("this")
    private int nodes;

    @GuardedBy("this")
    private int blockedNodes;

    @GuardedBy("this")
    private int assignedQueries;

    // Does not include queries with zero memory usage
    @GuardedBy("this")
    private final Map<QueryId, Long> queryMemoryReservations = new HashMap<>();

    @GuardedBy("this")
    private final Map<QueryId, List<MemoryAllocation>> queryMemoryAllocations = new HashMap<>();

    @GuardedBy("this")
    private final Map<QueryId, Long> queryMemoryRevocableReservations = new HashMap<>();

    public ClusterMemoryPool(MemoryPoolId id)
    {
        this.id = requireNonNull(id, "id is null");
    }

    public synchronized MemoryPoolInfo getInfo()
    {
        return new MemoryPoolInfo(
                totalDistributedBytes,
                reservedDistributedBytes,
                reservedRevocableDistributedBytes,
                ImmutableMap.copyOf(queryMemoryReservations),
                ImmutableMap.copyOf(queryMemoryAllocations),
                ImmutableMap.copyOf(queryMemoryRevocableReservations));
    }

    public synchronized ClusterMemoryPoolInfo getClusterInfo()
    {
        return getClusterInfo(Optional.empty());
    }

    public synchronized ClusterMemoryPoolInfo getClusterInfo(Optional<QueryId> largestMemoryQuery)
    {
        return new ClusterMemoryPoolInfo(getInfo(), blockedNodes, assignedQueries, largestMemoryQuery, Optional.empty());
    }

    public synchronized ClusterMemoryPoolInfo getClusterInfo(Optional<QueryId> largestMemoryQuery, Optional<List<QueryId>> runningQueries)
    {
        return new ClusterMemoryPoolInfo(getInfo(), blockedNodes, assignedQueries, largestMemoryQuery, runningQueries);
    }

    public MemoryPoolId getId()
    {
        return id;
    }

    @Managed
    public synchronized long getTotalDistributedBytes()
    {
        return totalDistributedBytes;
    }

    @Managed
    public synchronized long getFreeDistributedBytes()
    {
        return totalDistributedBytes - reservedDistributedBytes - reservedRevocableDistributedBytes;
    }

    @Managed
    public synchronized long getReservedDistributedBytes()
    {
        return reservedDistributedBytes;
    }

    @Managed
    public synchronized long getReservedRevocableDistributedBytes()
    {
        return reservedRevocableDistributedBytes;
    }

    @Managed
    public synchronized int getNodes()
    {
        return nodes;
    }

    @Managed
    public synchronized int getBlockedNodes()
    {
        return blockedNodes;
    }

    @Managed
    public synchronized int getAssignedQueries()
    {
        return assignedQueries;
    }

    public synchronized Map<QueryId, Long> getQueryMemoryReservations()
    {
        return ImmutableMap.copyOf(queryMemoryReservations);
    }

    public synchronized Map<QueryId, Long> getQueryMemoryRevocableReservations()
    {
        return ImmutableMap.copyOf(queryMemoryRevocableReservations);
    }

    public synchronized void update(List<MemoryInfo> memoryInfos, int assignedQueries)
    {
        nodes = 0;
        blockedNodes = 0;
        totalDistributedBytes = 0;
        reservedDistributedBytes = 0;
        reservedRevocableDistributedBytes = 0;
        this.assignedQueries = assignedQueries;
        this.queryMemoryReservations.clear();
        this.queryMemoryAllocations.clear();
        this.queryMemoryRevocableReservations.clear();

        for (MemoryInfo info : memoryInfos) {
            MemoryPoolInfo poolInfo = info.getPools().get(id);
            if (poolInfo != null) {
                nodes++;
                if (poolInfo.getFreeBytes() + poolInfo.getReservedRevocableBytes() <= 0) {
                    blockedNodes++;
                }
                totalDistributedBytes += poolInfo.getMaxBytes();
                reservedDistributedBytes += poolInfo.getReservedBytes();
                reservedRevocableDistributedBytes += poolInfo.getReservedRevocableBytes();
                for (Map.Entry<QueryId, Long> entry : poolInfo.getQueryMemoryReservations().entrySet()) {
                    queryMemoryReservations.merge(entry.getKey(), entry.getValue(), Long::sum);
                }
                for (Map.Entry<QueryId, List<MemoryAllocation>> entry : poolInfo.getQueryMemoryAllocations().entrySet()) {
                    queryMemoryAllocations.merge(entry.getKey(), entry.getValue(), this::mergeQueryAllocations);
                }
                for (Map.Entry<QueryId, Long> entry : poolInfo.getQueryMemoryRevocableReservations().entrySet()) {
                    queryMemoryRevocableReservations.merge(entry.getKey(), entry.getValue(), Long::sum);
                }
            }
        }
    }

    private List<MemoryAllocation> mergeQueryAllocations(List<MemoryAllocation> left, List<MemoryAllocation> right)
    {
        requireNonNull(left, "left is null");
        requireNonNull(right, "right is null");

        Map<String, MemoryAllocation> mergedAllocations = new HashMap<>();

        for (MemoryAllocation allocation : left) {
            mergedAllocations.put(allocation.getTag(), allocation);
        }

        for (MemoryAllocation allocation : right) {
            mergedAllocations.merge(
                    allocation.getTag(),
                    allocation,
                    (a, b) -> new MemoryAllocation(a.getTag(), a.getAllocation() + b.getAllocation()));
        }

        return new ArrayList<>(mergedAllocations.values());
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        ClusterMemoryPool that = (ClusterMemoryPool) o;
        return Objects.equals(id, that.id);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(id);
    }

    @Override
    public synchronized String toString()
    {
        return toStringHelper(this)
                .add("id", id)
                .add("totalDistributedBytes", totalDistributedBytes)
                .add("freeDistributedBytes", getFreeDistributedBytes())
                .add("reservedDistributedBytes", reservedDistributedBytes)
                .add("reservedRevocableDistributedBytes", reservedRevocableDistributedBytes)
                .add("nodes", nodes)
                .add("blockedNodes", blockedNodes)
                .add("assignedQueries", assignedQueries)
                .add("queryMemoryReservations", queryMemoryReservations)
                .add("queryMemoryAllocations", queryMemoryAllocations)
                .add("queryMemoryRevocableReservations", queryMemoryRevocableReservations)
                .toString();
    }
}