TempStorageStandaloneSpiller.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.spiller;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.io.DataOutput;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PageDataOutput;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.storage.SerializedStorageHandle;
import com.facebook.presto.spi.storage.TempDataOperationContext;
import com.facebook.presto.spi.storage.TempDataSink;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.spi.storage.TempStorageHandle;
import io.airlift.slice.InputStreamSliceInput;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static com.facebook.presto.common.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
import static com.facebook.presto.execution.buffer.PageSplitterUtil.splitPage;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_SPILL_FAILURE;
import static com.google.common.collect.Iterators.transform;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

/**
 * TempStorageStandaloneSpiller is a stateless spiller that provides basic spill
 * capabilities like spill, read, and remove. It operates over a storage handle
 * which can be serialized and passed around. Since the spiller does not maintain
 * any state internally, same instance can be used to operate on multiple spill files
 */
public class TempStorageStandaloneSpiller
        implements StandaloneSpiller
{
    private final TempDataOperationContext tempDataOperationContext;
    private final TempStorage tempStorage;
    private final PagesSerde serde;
    private final SpillerStats spillerStats;
    private final int maxBufferSizeInBytes;

    public TempStorageStandaloneSpiller(
            TempDataOperationContext tempDataOperationContext,
            TempStorage tempStorage,
            PagesSerde serde,
            SpillerStats spillerStats,
            int maxBufferSizeInBytes)
    {
        this.tempDataOperationContext = requireNonNull(tempDataOperationContext, "tempDataOperationContext is null");
        this.tempStorage = requireNonNull(tempStorage, "tempStorage is null");
        this.serde = requireNonNull(serde, "serde is null");
        this.spillerStats = requireNonNull(spillerStats, "spillerStats is null");
        this.maxBufferSizeInBytes = maxBufferSizeInBytes;
    }

    public SerializedStorageHandle spill(Iterator<Page> pageIterator)
    {
        List<DataOutput> bufferedPages = new ArrayList<>();
        int bufferedBytes = 0;
        IOException ioException = null;
        TempDataSink tempDataSink = null;
        try {
            tempDataSink = tempStorage.create(tempDataOperationContext);
            while (pageIterator.hasNext()) {
                Page page = pageIterator.next();
                List<Page> splitPages = splitPage(page, DEFAULT_MAX_PAGE_SIZE_IN_BYTES);
                for (Page splitPage : splitPages) {
                    SerializedPage serializedPage = serde.serialize(splitPage);
                    spillerStats.addToTotalSpilledBytes(serializedPage.getSizeInBytes());
                    PageDataOutput pageDataOutput = new PageDataOutput(serializedPage);
                    bufferedBytes += toIntExact(pageDataOutput.size());
                    bufferedPages.add(pageDataOutput);
                    if (bufferedBytes > maxBufferSizeInBytes) {
                        flushBufferedPages(tempDataSink, bufferedPages);
                        bufferedBytes = 0;
                    }
                }
            }

            // Flush remaining buffered pages
            if (!bufferedPages.isEmpty()) {
                flushBufferedPages(tempDataSink, bufferedPages);
            }
            TempStorageHandle tempStorageHandle = tempDataSink.commit();
            return new SerializedStorageHandle(tempStorage.serializeHandle(tempStorageHandle));
        }
        catch (IOException e) {
            ioException = e;
            try {
                if (tempDataSink != null) {
                    tempDataSink.rollback();
                }
            }
            catch (IOException exception) {
                if (ioException != exception) {
                    ioException.addSuppressed(exception);
                }
            }
        }
        finally {
            try {
                if (tempDataSink != null) {
                    tempDataSink.close();
                }
            }
            catch (IOException e) {
                if (ioException == null) {
                    ioException = e;
                }
                else if (ioException != e) {
                    ioException.addSuppressed(e);
                }
                throw new PrestoException(GENERIC_SPILL_FAILURE, "Failed to spill pages", ioException);
            }
        }

        throw new PrestoException(GENERIC_SPILL_FAILURE, "Failed to spill pages", ioException);
    }

    private void flushBufferedPages(TempDataSink tempDataSink, List<DataOutput> bufferedPages)
    {
        try {
            tempDataSink.write(bufferedPages);
        }
        catch (UncheckedIOException | IOException e) {
            throw new PrestoException(GENERIC_SPILL_FAILURE, "Failed to spill pages", e);
        }

        bufferedPages.clear();
    }

    public Iterator<Page> getSpilledPages(SerializedStorageHandle storageHandle)
    {
        try {
            TempStorageHandle tempStorageHandle = tempStorage.deserialize(storageHandle.getSerializedStorageHandle());
            InputStream inputStream = tempStorage.open(tempDataOperationContext, tempStorageHandle);
            Iterator<Page> deserializedPages = PagesSerdeUtil.readPages(serde, new InputStreamSliceInput(inputStream));
            return transform(deserializedPages, Page::compact);
        }
        catch (IOException e) {
            throw new PrestoException(GENERIC_SPILL_FAILURE, "Failed to read spilled pages", e);
        }
    }

    public void remove(SerializedStorageHandle storageHandle)
    {
        try {
            tempStorage.remove(tempDataOperationContext, tempStorage.deserialize(storageHandle.getSerializedStorageHandle()));
        }
        catch (IOException e) {
            throw new PrestoException(GENERIC_SPILL_FAILURE, "Failed to delete spill file", e);
        }
    }
}