TempStorageManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.storage;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.connector.ConnectorAwareNodeManager;
import com.facebook.presto.connector.system.GlobalSystemConnector;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.spi.storage.TempStorageContext;
import com.facebook.presto.spi.storage.TempStorageFactory;
import com.facebook.presto.spiller.LocalTempStorage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import javax.inject.Inject;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.spiller.LocalTempStorage.TEMP_STORAGE_PATH;
import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.io.Files.getNameWithoutExtension;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class TempStorageManager
{
    private static final Logger log = Logger.get(TempStorageManager.class);
    // TODO: Make this configurable
    private static final File TEMP_STORAGE_CONFIGURATION_DIR = new File("etc/temp-storage/");
    public static final String TEMP_STORAGE_FACTORY_NAME = "temp-storage-factory.name";

    private final Map<String, TempStorageFactory> tempStorageFactories = new ConcurrentHashMap<>();
    private final Map<String, TempStorage> loadedTempStorages = new ConcurrentHashMap<>();
    private final AtomicBoolean tempStorageLoading = new AtomicBoolean();

    private final NodeManager nodeManager;

    @Inject
    public TempStorageManager(InternalNodeManager internalNodeManager, NodeInfo nodeInfo)
    {
        this(new ConnectorAwareNodeManager(
                requireNonNull(internalNodeManager, "internalNodeManager is null"),
                requireNonNull(nodeInfo, "nodeInfo is null").getEnvironment(),
                new ConnectorId(GlobalSystemConnector.NAME)));
    }

    @VisibleForTesting
    public TempStorageManager(NodeManager nodeManager)
    {
        this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
    }

    public void addTempStorageFactory(TempStorageFactory tempStorageFactory)
    {
        requireNonNull(tempStorageFactory, "tempStorageFactory is null");

        if (tempStorageFactories.putIfAbsent(tempStorageFactory.getName(), tempStorageFactory) != null) {
            throw new IllegalArgumentException(format("Temp Storage '%s' is already registered", tempStorageFactory.getName()));
        }
    }

    public TempStorage getTempStorage(String name)
    {
        TempStorage tempStorage = loadedTempStorages.get(name);
        checkState(tempStorage != null, "tempStorage %s was not loaded", name);

        return tempStorage;
    }

    public void loadTempStorages()
            throws IOException
    {
        ImmutableMap.Builder<String, Map<String, String>> storageProperties = ImmutableMap.builder();
        // Always load local temp storage
        addTempStorageFactory(new LocalTempStorage.Factory());
        storageProperties.put(
                LocalTempStorage.NAME,
                // TODO: Local temp storage should be configurable
                ImmutableMap.of(
                        TEMP_STORAGE_PATH,
                        Paths.get(System.getProperty("java.io.tmpdir"), "presto", "temp_storage").toAbsolutePath().toString(),
                        TEMP_STORAGE_FACTORY_NAME,
                        "local"));

        for (File file : listFiles(TEMP_STORAGE_CONFIGURATION_DIR)) {
            if (file.isFile() && file.getName().endsWith(".properties")) {
                String name = getNameWithoutExtension(file.getName());
                Map<String, String> properties = loadProperties(file);
                storageProperties.put(name, properties);
            }
        }
        loadTempStorages(storageProperties.build());
    }

    public void loadTempStorages(Map<String, Map<String, String>> storageProperties)
            throws IOException
    {
        if (!tempStorageLoading.compareAndSet(false, true)) {
            return;
        }

        storageProperties.entrySet().stream()
                .forEach(entry -> loadTempStorage(entry.getKey(), entry.getValue()));
    }

    protected void loadTempStorage(String name, Map<String, String> properties)
    {
        requireNonNull(name, "name is null");
        requireNonNull(properties, "properties is null");

        log.info("-- Loading temp storage %s --", name);

        String tempStorageFactoryName = null;
        ImmutableMap.Builder<String, String> tempStorageProperties = ImmutableMap.builder();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            if (entry.getKey().equals(TEMP_STORAGE_FACTORY_NAME)) {
                tempStorageFactoryName = entry.getValue();
            }
            else {
                tempStorageProperties.put(entry.getKey(), entry.getValue());
            }
        }

        checkState(tempStorageFactoryName != null, "Configuration for tempStorage %s does not contain temp-storage-factory.name", name);

        TempStorageFactory factory = tempStorageFactories.get(tempStorageFactoryName);
        checkState(factory != null, "Temp Storage Factory %s is not registered", tempStorageFactoryName);

        TempStorage tempStorage = factory.create(tempStorageProperties.build(), new TempStorageContext(nodeManager));
        if (loadedTempStorages.putIfAbsent(name, tempStorage) != null) {
            throw new IllegalArgumentException(format("Temp Storage '%s' is already loaded", name));
        }

        log.info("-- Loaded temp storage %s --", name);
    }

    private static List<File> listFiles(File dir)
    {
        if (dir != null && dir.isDirectory()) {
            File[] files = dir.listFiles();
            if (files != null) {
                return ImmutableList.copyOf(files);
            }
        }
        return ImmutableList.of();
    }
}