LocalMemoryManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.memory;

import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.memory.MemoryPoolInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.memory.NodeMemoryConfig.QUERY_MAX_MEMORY_PER_NODE_CONFIG;
import static com.facebook.presto.memory.NodeMemoryConfig.QUERY_MAX_TOTAL_MEMORY_PER_NODE_CONFIG;
import static com.facebook.presto.memory.NodeMemoryConfig.QUERY_SOFT_MAX_MEMORY_PER_NODE_CONFIG;
import static com.facebook.presto.memory.NodeMemoryConfig.QUERY_SOFT_MAX_TOTAL_MEMORY_PER_NODE_CONFIG;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.airlift.units.DataSize.Unit.BYTE;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public final class LocalMemoryManager
{
    public static final MemoryPoolId GENERAL_POOL = new MemoryPoolId("general");
    public static final MemoryPoolId RESERVED_POOL = new MemoryPoolId("reserved");

    private DataSize maxMemory;
    private Map<MemoryPoolId, MemoryPool> pools;

    @Inject
    public LocalMemoryManager(NodeMemoryConfig config)
    {
        this(config, Runtime.getRuntime().maxMemory());
    }

    @VisibleForTesting
    public LocalMemoryManager(NodeMemoryConfig config, long availableMemory)
    {
        requireNonNull(config, "config is null");
        configureMemoryPools(config, availableMemory);
    }

    private void configureMemoryPools(NodeMemoryConfig config, long availableMemory)
    {
        validateHeapHeadroom(config, availableMemory);
        maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);
        checkArgument(
                config.getMaxQueryMemoryPerNode().toBytes() <= config.getMaxQueryTotalMemoryPerNode().toBytes(),
                "Max query memory per node (%s) cannot be greater than the max query total memory per node (%s).",
                QUERY_MAX_MEMORY_PER_NODE_CONFIG,
                QUERY_MAX_TOTAL_MEMORY_PER_NODE_CONFIG);
        checkArgument(
                config.getMaxQueryMemoryPerNode().toBytes() >= config.getSoftMaxQueryMemoryPerNode().toBytes(),
                "Max query memory per node (%s) must be >= soft limit (%s).",
                QUERY_MAX_MEMORY_PER_NODE_CONFIG,
                QUERY_SOFT_MAX_MEMORY_PER_NODE_CONFIG);
        checkArgument(
                config.getMaxQueryTotalMemoryPerNode().toBytes() >= config.getSoftMaxQueryTotalMemoryPerNode().toBytes(),
                "Max query total memory per node (%s) must be >= soft limit (%s).",
                QUERY_MAX_TOTAL_MEMORY_PER_NODE_CONFIG,
                QUERY_SOFT_MAX_TOTAL_MEMORY_PER_NODE_CONFIG);
        ImmutableMap.Builder<MemoryPoolId, MemoryPool> builder = ImmutableMap.builder();
        long generalPoolSize = maxMemory.toBytes();
        if (config.isReservedPoolEnabled()) {
            builder.put(RESERVED_POOL, new MemoryPool(RESERVED_POOL, config.getMaxQueryTotalMemoryPerNode()));
            generalPoolSize -= config.getMaxQueryTotalMemoryPerNode().toBytes();
        }
        verify(generalPoolSize > 0, "general memory pool size is 0");
        builder.put(GENERAL_POOL, new MemoryPool(GENERAL_POOL, new DataSize(generalPoolSize, BYTE)));
        this.pools = builder.build();
    }

    @VisibleForTesting
    static void validateHeapHeadroom(NodeMemoryConfig config, long availableMemory)
    {
        long maxQueryTotalMemoryPerNode = config.getMaxQueryTotalMemoryPerNode().toBytes();
        long heapHeadroom = config.getHeapHeadroom().toBytes();
        // (availableMemory - maxQueryTotalMemoryPerNode) bytes will be available for the general pool and the
        // headroom/untracked allocations, so the heapHeadroom cannot be larger than that space.
        if (heapHeadroom < 0 || heapHeadroom + maxQueryTotalMemoryPerNode > availableMemory) {
            throw new IllegalArgumentException(
                    format("Invalid memory configuration. The sum of max total query memory per node (%s) and heap headroom (%s) cannot be larger than the available heap memory (%s)",
                            maxQueryTotalMemoryPerNode,
                            heapHeadroom,
                            availableMemory));
        }
    }

    public MemoryInfo getInfo()
    {
        ImmutableMap.Builder<MemoryPoolId, MemoryPoolInfo> builder = ImmutableMap.builder();
        for (Map.Entry<MemoryPoolId, MemoryPool> entry : pools.entrySet()) {
            builder.put(entry.getKey(), entry.getValue().getInfo());
        }
        return new MemoryInfo(maxMemory, builder.build());
    }

    public List<MemoryPool> getPools()
    {
        return ImmutableList.copyOf(pools.values());
    }

    public MemoryPool getGeneralPool()
    {
        return pools.get(GENERAL_POOL);
    }

    public Optional<MemoryPool> getReservedPool()
    {
        return Optional.ofNullable(pools.get(RESERVED_POOL));
    }
}