LocalTempStorage.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.spiller;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.io.DataOutput;
import com.facebook.presto.common.io.DataSink;
import com.facebook.presto.common.io.OutputStreamDataSink;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.storage.StorageCapabilities;
import com.facebook.presto.spi.storage.TempDataOperationContext;
import com.facebook.presto.spi.storage.TempDataSink;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.spi.storage.TempStorageContext;
import com.facebook.presto.spi.storage.TempStorageFactory;
import com.facebook.presto.spi.storage.TempStorageHandle;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;

import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.spi.StandardErrorCode.OUT_OF_SPILL_SPACE;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.createDirectories;
import static java.nio.file.Files.delete;
import static java.nio.file.Files.getFileStore;
import static java.nio.file.Files.newDirectoryStream;
import static java.nio.file.StandardOpenOption.APPEND;
import static java.util.Objects.requireNonNull;

public class LocalTempStorage
        implements TempStorage
{
    public static final String NAME = "local";
    public static final String TEMP_STORAGE_PATH = "temp-storage.path";

    private static final Logger log = Logger.get(LocalTempStorage.class);

    private static final String SPILL_FILE_PREFIX = "spill";
    private static final String SPILL_FILE_SUFFIX = ".bin";
    private static final String SPILL_FILE_GLOB = "spill*.bin";

    private final List<Path> spillPaths;
    private final double maxUsedSpaceThreshold;

    @GuardedBy("this")
    private int roundRobinIndex;

    public LocalTempStorage(List<Path> spillPaths, double maxUsedSpaceThreshold)
    {
        this.spillPaths = ImmutableList.copyOf(requireNonNull(spillPaths, "spillPaths is null"));
        this.maxUsedSpaceThreshold = maxUsedSpaceThreshold;
        initialize();
    }

    private void initialize()
    {
        // From FileSingleStreamSpillerFactory constructor
        spillPaths.forEach(path -> {
            try {
                createDirectories(path);
            }
            catch (IOException e) {
                throw new IllegalArgumentException(
                        format("could not create spill path %s; adjust experimental.spiller-spill-path config property or filesystem permissions", path), e);
            }
            if (!path.toFile().canWrite()) {
                throw new IllegalArgumentException(
                        format("spill path %s is not writable; adjust experimental.spiller-spill-path config property or filesystem permissions", path));
            }
        });

        // From FileSingleStreamSpillerFactory#cleanupOldSpillFiles
        spillPaths.forEach(LocalTempStorage::cleanupOldSpillFiles);
    }

    @Override
    public TempDataSink create(TempDataOperationContext context)
            throws IOException
    {
        Path path = Files.createTempFile(getNextSpillPath(), SPILL_FILE_PREFIX, SPILL_FILE_SUFFIX);
        return new LocalTempDataSink(path);
    }

    @Override
    public InputStream open(TempDataOperationContext context, TempStorageHandle handle)
            throws IOException
    {
        return Files.newInputStream(((LocalTempStorageHandle) handle).getFilePath());
    }

    @Override
    public void remove(TempDataOperationContext context, TempStorageHandle handle)
            throws IOException
    {
        Files.delete(((LocalTempStorageHandle) handle).getFilePath());
    }

    @Override
    public byte[] serializeHandle(TempStorageHandle storageHandle)
    {
        URI uri = ((LocalTempStorageHandle) storageHandle).getFilePath().toUri();
        return uri.toString().getBytes(UTF_8);
    }

    @Override
    public TempStorageHandle deserialize(byte[] serializedStorageHandle)
    {
        String uriString = new String(serializedStorageHandle, UTF_8);
        try {
            return new LocalTempStorageHandle(Paths.get(new URI(uriString)));
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("Invalid URI: " + uriString, e);
        }
    }

    @Override
    public List<StorageCapabilities> getStorageCapabilities()
    {
        return ImmutableList.of();
    }

    private static void cleanupOldSpillFiles(Path path)
    {
        try (DirectoryStream<Path> stream = newDirectoryStream(path, SPILL_FILE_GLOB)) {
            stream.forEach(spillFile -> {
                try {
                    log.info("Deleting old spill file: " + spillFile);
                    delete(spillFile);
                }
                catch (Exception e) {
                    log.warn("Could not cleanup old spill file: " + spillFile);
                }
            });
        }
        catch (IOException e) {
            log.warn(e, "Error cleaning spill files");
        }
    }

    private synchronized Path getNextSpillPath()
    {
        int spillPathsCount = spillPaths.size();
        for (int i = 0; i < spillPathsCount; ++i) {
            int pathIndex = (roundRobinIndex + i) % spillPathsCount;
            Path path = spillPaths.get(pathIndex);
            if (hasEnoughDiskSpace(path)) {
                roundRobinIndex = (roundRobinIndex + i + 1) % spillPathsCount;
                return path;
            }
        }
        if (spillPaths.isEmpty()) {
            throw new PrestoException(OUT_OF_SPILL_SPACE, "No spill paths configured");
        }
        throw new PrestoException(OUT_OF_SPILL_SPACE, "No free space available for spill");
    }

    private boolean hasEnoughDiskSpace(Path path)
    {
        try {
            FileStore fileStore = getFileStore(path);
            return fileStore.getUsableSpace() > fileStore.getTotalSpace() * (1.0 - maxUsedSpaceThreshold);
        }
        catch (IOException e) {
            throw new PrestoException(OUT_OF_SPILL_SPACE, "Cannot determine free space for spill", e);
        }
    }

    private static class LocalTempStorageHandle
            implements TempStorageHandle
    {
        private final Path filePath;

        public LocalTempStorageHandle(Path filePath)
        {
            this.filePath = requireNonNull(filePath, "filePath is null");
        }

        public Path getFilePath()
        {
            return filePath;
        }

        @Override
        public String toString()
        {
            return filePath.toString();
        }
    }

    private static class LocalTempDataSink
            implements TempDataSink
    {
        private final DataSink sink;
        private final Path path;

        public LocalTempDataSink(Path path)
                throws IOException
        {
            this.path = requireNonNull(path, "path is null");
            this.sink = new OutputStreamDataSink(Files.newOutputStream(path, APPEND));
        }

        @Override
        public TempStorageHandle commit()
                throws IOException
        {
            sink.close();
            return new LocalTempStorageHandle(path);
        }

        @Override
        public void rollback()
                throws IOException
        {
            this.commit();
            Files.delete(path);
        }

        @Override
        public long size()
        {
            return sink.size();
        }

        @Override
        public long getRetainedSizeInBytes()
        {
            return sink.getRetainedSizeInBytes();
        }

        @Override
        public void write(List<DataOutput> outputData)
                throws IOException
        {
            sink.write(outputData);
        }

        @Override
        public void close()
                throws IOException
        {
            sink.close();
        }
    }

    public static class Factory
            implements TempStorageFactory
    {
        @Override
        public String getName()
        {
            return NAME;
        }

        @Override
        public TempStorage create(Map<String, String> config, TempStorageContext context)
        {
            String configPaths = config.get(TEMP_STORAGE_PATH);
            checkState(configPaths != null, "Local temp storage configuration must contain the '%s' property", TEMP_STORAGE_PATH);

            List<String> pathsSplit = ImmutableList.copyOf(Splitter.on(",").trimResults().omitEmptyStrings().split(config.get(TEMP_STORAGE_PATH)));
            List<Path> tempStoragePaths = pathsSplit.stream()
                    .map(Paths::get)
                    .collect(toImmutableList());

            // TODO: make maxUsedSpaceThreshold configurable
            return new LocalTempStorage(tempStoragePaths, 1.0);
        }
    }
}