SpoolingOutputBuffer.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution.buffer;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.io.DataOutput;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PageDataOutput;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.spi.storage.TempDataOperationContext;
import com.facebook.presto.spi.storage.TempDataSink;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.spi.storage.TempStorageHandle;
import com.facebook.presto.util.FinalizerService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.SliceInput;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.Immutable;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.facebook.presto.execution.buffer.BufferResult.emptyResults;
import static com.facebook.presto.execution.buffer.BufferState.FINISHED;
import static com.facebook.presto.execution.buffer.BufferState.FLUSHING;
import static com.facebook.presto.execution.buffer.BufferState.NO_MORE_BUFFERS;
import static com.facebook.presto.execution.buffer.BufferState.OPEN;
import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.SPOOLING;
import static com.facebook.presto.spi.StandardErrorCode.SPOOLING_STORAGE_ERROR;
import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPages;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.advance;
import static com.google.common.collect.Range.closedOpen;
import static com.google.common.util.concurrent.Futures.catchingAsync;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.Futures.transformAsync;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

@SuppressWarnings("UnstableApiUsage")
public class SpoolingOutputBuffer
        implements OutputBuffer
{
    private final TaskId taskId;
    private final String taskInstanceId;
    private final OutputBuffers outputBuffers;
    private final StateMachine<BufferState> state;
    private final TempDataOperationContext tempDataOperationContext;
    private final TempStorage tempStorage;
    private final long thresholdInBytes;
    private final FinalizerService finalizerService;
    private final ListeningExecutorService executor;

    private final AtomicLong totalBufferedBytes = new AtomicLong();
    private final AtomicLong totalBufferedPages = new AtomicLong();
    private final AtomicLong totalPagesAdded = new AtomicLong();
    private final AtomicLong totalRowsAdded = new AtomicLong();

    private final OutputBufferId outputBufferId = new OutputBufferId(0);

    private static final Logger log = Logger.get(SpoolingOutputBuffer.class);

    private final AtomicBoolean noMorePages = new AtomicBoolean();
    private final AtomicLong currentMemorySequenceId = new AtomicLong();
    private final AtomicLong currentSequenceId = new AtomicLong();
    private final AtomicLong startPage = new AtomicLong();
    private final AtomicLong totalPagesRemaining = new AtomicLong();
    private final AtomicLong totalInMemoryBytes = new AtomicLong();
    private final AtomicLong peakMemoryUsage = new AtomicLong();

    private final AtomicLong totalStorageBytesAdded = new AtomicLong();
    private final AtomicLong totalStoragePagesAdded = new AtomicLong();

    @GuardedBy("this")
    private final Deque<HandleInfo> handleInfoQueue = new LinkedList<>();

    @GuardedBy("this")
    private final Queue<SerializedPage> pages = new ArrayDeque<>();

    @GuardedBy("this")
    private PendingRead pendingRead;

    public SpoolingOutputBuffer(
            TaskId taskId,
            String taskInstanceId,
            OutputBuffers outputBuffers,
            StateMachine<BufferState> state,
            TempStorage tempStorage,
            long thresholdInBytes,
            ListeningExecutorService executor,
            FinalizerService finalizerService)
    {
        this.taskId = requireNonNull(taskId, "taskId is null");
        this.taskInstanceId = requireNonNull(taskInstanceId, "taskInstanceIs is null");
        this.outputBuffers = requireNonNull(outputBuffers, "outputBuffers is null");
        this.state = requireNonNull(state, "state is null");
        this.tempStorage = requireNonNull(tempStorage, "tempStorage is null");
        checkArgument(thresholdInBytes >= 0, "thresholdInBytes must be >= 0");
        this.thresholdInBytes = thresholdInBytes;
        this.executor = requireNonNull(executor, "executor is null");
        this.finalizerService = requireNonNull(finalizerService, "finalizerService is null");
        this.finalizerService.addFinalizer(this, this::close);

        tempDataOperationContext = new TempDataOperationContext(Optional.empty(), taskId.getQueryId().toString(), Optional.empty(), Optional.empty(), new Identity("spooling-buffer", Optional.empty()));

        state.compareAndSet(OPEN, NO_MORE_BUFFERS);
    }

    @Override
    public OutputBufferInfo getInfo()
    {
        return new OutputBufferInfo(
                "SPOOLING",
                state.get(),
                state.get().canAddBuffers(),
                state.get().canAddPages(),
                totalBufferedBytes.get(),
                totalBufferedPages.get(),
                totalRowsAdded.get(),
                totalPagesAdded.get(),
                ImmutableList.of());
    }

    @Override
    public boolean isFinished()
    {
        return state.get() == BufferState.FINISHED;
    }

    @Override
    public double getUtilization()
    {
        return totalInMemoryBytes.get() / (double) thresholdInBytes;
    }

    @Override
    public boolean isOverutilized()
    {
        return totalInMemoryBytes.get() > thresholdInBytes;
    }

    @Override
    public long getPeakMemoryUsage()
    {
        return peakMemoryUsage.get();
    }

    @Override
    public ListenableFuture<?> isFull()
    {
        return immediateFuture(null);
    }

    @Override
    public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener)
    {
        state.addStateChangeListener(stateChangeListener);
    }

    @Override
    public void setOutputBuffers(OutputBuffers newOutputBuffers)
    {
        requireNonNull(newOutputBuffers, "newOutputBuffers is null");
        checkArgument(outputBuffers.getType() == SPOOLING, "Invalid output buffers type");
        checkArgument(outputBuffers.isNoMoreBufferIds(), "invalid noMoreBufferIds");

        if (state.get().isTerminal() || outputBuffers.getVersion() >= newOutputBuffers.getVersion()) {
            return;
        }

        outputBuffers.checkValidTransition(newOutputBuffers);
    }

    @Override
    public void enqueue(Lifespan lifespan, List<SerializedPage> pages)
    {
        if (!state.get().canAddPages()) {
            return;
        }
        PendingRead pendingRead;
        synchronized (this) {
            this.pages.addAll(pages);

            long bytesAdded = getPagesSize(pages);
            long pagesAdded = pages.size();

            // update output buffer info
            totalBufferedBytes.addAndGet(bytesAdded);
            totalBufferedPages.addAndGet(pagesAdded);
            totalPagesAdded.addAndGet(pagesAdded);
            totalRowsAdded.addAndGet(getPagesRows(pages));

            totalInMemoryBytes.addAndGet(bytesAdded);

            totalPagesRemaining.addAndGet(pagesAdded);
            peakMemoryUsage.accumulateAndGet(totalInMemoryBytes.get(), Math::max);

            if (totalInMemoryBytes.get() >= thresholdInBytes) {
                flush();
            }

            pendingRead = this.pendingRead;
            this.pendingRead = null;
        }

        if (pendingRead != null) {
            processPendingRead(pendingRead);
        }
    }

    @Override
    public synchronized void enqueue(Lifespan lifespan, int partition, List<SerializedPage> pages)
    {
        checkState(partition == 0, "Expected partition number to be zero");
        enqueue(lifespan, pages);
    }

    private synchronized void flush()
    {
        List<DataOutput> dataOutputs = pages.stream()
                .map(PageDataOutput::new)
                .collect(toImmutableList());

        // create a future that will hold the handle
        ListenableFuture<TempStorageHandle> handleFuture = executor.submit(() -> {
            TempDataSink dataSink = tempStorage.create(tempDataOperationContext);
            dataSink.write(dataOutputs);
            return dataSink.commit();
        });

        // store the handleFuture and file information
        long bytes = totalInMemoryBytes.get();
        int pageCount = pages.size();
        HandleInfo handleInfo = new HandleInfo(
                closedOpen(currentMemorySequenceId.get(), currentMemorySequenceId.get() + pageCount),
                handleFuture,
                bytes,
                pageCount);
        handleInfoQueue.add(handleInfo);

        // update cutoff for file pages
        currentMemorySequenceId.addAndGet(pageCount);

        // clear the pages in memory
        pages.clear();

        // update info about storage
        totalStorageBytesAdded.addAndGet(bytes);
        totalStoragePagesAdded.addAndGet(pageCount);
        totalInMemoryBytes.set(0);
    }

    @Override
    public synchronized ListenableFuture<BufferResult> get(OutputBufferId bufferId, long startSequenceId, long maxSizeInBytes)
    {
        requireNonNull(bufferId, "outputBufferId is null");
        checkArgument(bufferId.getId() == outputBufferId.getId(), "Invalid buffer id");
        checkArgument(maxSizeInBytes > 0, "maxSize must be at least 1 byte");

        acknowledge(bufferId, startSequenceId);

        long currentSequenceId = this.currentSequenceId.get();

        // process the request if we have no more data coming in, have data to read, or if this is an outdated request
        if (noMorePages.get() || !handleInfoQueue.isEmpty() || !pages.isEmpty() || currentSequenceId != startSequenceId) {
            return processRead(startSequenceId, maxSizeInBytes);
        }

        // creating a pending read, and abort the previous one
        PendingRead oldPendingRead = pendingRead;
        pendingRead = new PendingRead(taskInstanceId, currentSequenceId, maxSizeInBytes);

        if (oldPendingRead != null) {
            oldPendingRead.completeResultFutureWithEmpty();
        }

        return pendingRead.getResultFuture();
    }

    private void processPendingRead(PendingRead pendingRead)
    {
        if (pendingRead.getResultFuture().isDone()) {
            return;
        }

        ListenableFuture<BufferResult> resultFuture = processRead(pendingRead.getStartSequenceId(), pendingRead.getMaxSizeInBytes());
        pendingRead.setResultFuture(resultFuture);
    }

    private synchronized ListenableFuture<BufferResult> processRead(long startSequenceId, long maxSizeInBytes)
    {
        long currentSequenceId = this.currentSequenceId.get();

        // startSequenceId is for a page before the current page position
        if (startSequenceId < currentSequenceId) {
            return immediateFuture(emptyResults(taskInstanceId, startSequenceId, false));
        }

        // tells client that buffer is complete
        if (noMorePages.get() && handleInfoQueue.isEmpty() && pages.isEmpty()) {
            return immediateFuture(emptyResults(taskInstanceId, startSequenceId, true));
        }

        // validate previous pages were acknowledged
        checkState(currentSequenceId == startSequenceId, "Invalid startSequenceId");

        // get a copy of in memory pages and the HandleInfo to save the handleFuture
        List<HandleInfo> handleInfos = ImmutableList.copyOf(handleInfoQueue);
        List<SerializedPage> pages = ImmutableList.copyOf(this.pages);

        GetTracker getTracker = new GetTracker(maxSizeInBytes, handleInfos, pages, toIntExact(startPage.get()));

        // read pages
        ListenableFuture<List<SerializedPage>> storagePages = getPagesFromStorage(startSequenceId, getTracker);
        ListenableFuture<List<SerializedPage>> memoryPages = transform(storagePages, input -> {
            long pageCount = getTracker.getPageCount();
            long bytes = getTracker.getBytes();
            long startMemorySequenceId = startSequenceId + pageCount;
            if (startMemorySequenceId == currentMemorySequenceId.get() && (bytes < maxSizeInBytes || input.isEmpty())) {
                ImmutableList.Builder<SerializedPage> combinedPages = ImmutableList.builder();
                combinedPages.addAll(input);
                combinedPages.addAll(getPagesFromMemory(startMemorySequenceId, getTracker));
                return combinedPages.build();
            }
            return input;
        }, executor);

        ListenableFuture<BufferResult> resultFuture = transform(memoryPages, input -> {
            long newSequenceId = startSequenceId + input.size();
            return new BufferResult(taskInstanceId, startSequenceId, newSequenceId, false, 0, input);
        }, executor);

        return catchingAsync(resultFuture, Exception.class, e -> {
            log.error("Task %s: Failed to get page with startSequenceId %s", taskId, startSequenceId);
            return immediateFailedFuture(e);
        }, executor);
    }

    private ListenableFuture<List<SerializedPage>> getPagesFromStorage(long startSequenceId, GetTracker getTracker)
    {
        if (startSequenceId >= currentMemorySequenceId.get()) {
            return immediateFuture(ImmutableList.of());
        }

        Iterator<HandleInfo> handleInfoIterator = getTracker.getHandleInfos().iterator();
        HandleInfo handleInfo = handleInfoIterator.next();
        ListenableFuture<TempStorageHandle> handleFuture = handleInfo.getHandleFuture();
        return transformAsync(handleFuture, input -> getPagesFromStorage(ImmutableList.builder(), handleInfoIterator, input, getTracker), executor);
    }

    private ListenableFuture<List<SerializedPage>> getPagesFromStorage(ImmutableList.Builder<SerializedPage> resultBuilder, Iterator<HandleInfo> handleIterator, TempStorageHandle handle, GetTracker getTracker)
    {
        long maxBytes = getTracker.getMaxSizeInBytes();
        long bytes = getTracker.getBytes();
        long pageCount = getTracker.getPageCount();

        try (SliceInput inputStream = new InputStreamSliceInput(tempStorage.open(tempDataOperationContext, handle))) {
            Iterator<SerializedPage> serializedPages = readSerializedPages(inputStream);
            advance(serializedPages, getTracker.getStartPage());

            while (serializedPages.hasNext()) {
                SerializedPage page = serializedPages.next();
                long bytesRead = bytes;
                bytes += page.getRetainedSizeInBytes();

                if (pageCount != 0 && bytes > maxBytes) {
                    getTracker.update(bytesRead, pageCount);
                    return immediateFuture(resultBuilder.build());
                }
                resultBuilder.add(page);
                pageCount++;
            }

            getTracker.update(bytes, pageCount);

            if (!handleIterator.hasNext()) {
                return immediateFuture(resultBuilder.build());
            }
            return transformAsync(handleIterator.next().getHandleFuture(), input -> getPagesFromStorage(resultBuilder, handleIterator, input, getTracker), executor);
        }
        catch (IOException e) {
            throw new PrestoException(SPOOLING_STORAGE_ERROR, "Failed to read file from TempStorage", e);
        }
    }

    private List<SerializedPage> getPagesFromMemory(long startSequenceId, GetTracker getTracker)
    {
        checkArgument(startSequenceId == currentMemorySequenceId.get(), "Invalid startSequenceId for memory pages");
        checkArgument(getTracker.bytes < getTracker.maxSizeInBytes, "bytesRead is greater than maxSize");

        ImmutableList.Builder<SerializedPage> result = ImmutableList.builder();
        List<SerializedPage> pages = getTracker.getMemoryPages();
        long maxBytes = getTracker.maxSizeInBytes;
        long bytes = 0;
        long pageCount = 0;

        for (SerializedPage page : pages) {
            bytes += page.getRetainedSizeInBytes();
            if (pageCount != 0 && bytes > maxBytes) {
                break;
            }
            result.add(page);
            pageCount++;
        }

        return result.build();
    }

    @Override
    public synchronized void acknowledge(OutputBufferId bufferId, long sequenceId)
    {
        checkArgument(bufferId.getId() == outputBufferId.getId(), "Invalid buffer id");
        checkArgument(sequenceId >= 0, "Invalid sequenceId");

        // ignore if buffer is destroyed OR pages have been acknowledged already
        if (state.get() == FINISHED || sequenceId < currentSequenceId.get()) {
            return;
        }

        long oldSequenceId = currentSequenceId.get();
        int pagesToRemove = toIntExact(sequenceId - oldSequenceId);
        long currentSequenceId = oldSequenceId;

        checkArgument(pagesToRemove <= totalPagesRemaining.get(), "Invalid sequenceId");

        // remove the pages from storage
        currentSequenceId += acknowledgePagesFromStorage(sequenceId);

        // remove the pages from memory
        if (currentSequenceId < sequenceId) {
            acknowledgePagesFromMemory(sequenceId, currentSequenceId);
        }

        verify(this.currentSequenceId.compareAndSet(oldSequenceId, oldSequenceId + pagesToRemove));
    }

    private synchronized long acknowledgePagesFromStorage(long sequenceId)
    {
        long pagesAcknowledged = 0;
        long pagesRemoved = 0;
        long bytesRemoved = 0;
        List<HandleInfo> handleInfos = ImmutableList.copyOf(handleInfoQueue);

        for (HandleInfo handleInfo : handleInfos) {
            Range<Long> range = handleInfo.getRange();

            if (range.upperEndpoint() <= sequenceId) {
                handleInfo.removeFile();
                handleInfoQueue.removeFirst();

                pagesAcknowledged += handleInfo.getPageCount() - startPage.get();
                pagesRemoved += handleInfo.getPageCount();
                bytesRemoved += handleInfo.getBytes();

                startPage.set(0);
            }
            else {
                pagesAcknowledged += sequenceId - range.lowerEndpoint() - startPage.get();
                startPage.set(toIntExact(sequenceId - range.lowerEndpoint()));
                break;
            }
        }

        totalBufferedPages.addAndGet(-pagesRemoved);
        totalBufferedBytes.addAndGet(-bytesRemoved);
        totalPagesRemaining.addAndGet(-pagesAcknowledged);

        return pagesAcknowledged;
    }

    private synchronized void acknowledgePagesFromMemory(long sequenceId, long startSequenceId)
    {
        checkState(startSequenceId == currentMemorySequenceId.get(), "Invalid startSequenceId for memory pages");
        int pagesToRemove = toIntExact(sequenceId - startSequenceId);
        checkArgument(pagesToRemove <= pages.size(), "Invalid sequenceId");

        long bytesRemoved = 0;
        for (int i = 0; i < pagesToRemove; i++) {
            SerializedPage removedPage = pages.remove();
            bytesRemoved += removedPage.getRetainedSizeInBytes();
            currentMemorySequenceId.incrementAndGet();
        }

        totalBufferedPages.addAndGet(-pagesToRemove);
        totalBufferedBytes.addAndGet(-bytesRemoved);
        totalInMemoryBytes.addAndGet(-bytesRemoved);
        totalPagesRemaining.addAndGet(-pagesToRemove);
    }

    @Override
    public void abort(OutputBufferId bufferId)
    {
        checkArgument(bufferId.getId() == outputBufferId.getId(), "Invalid bufferId");
        destroy();
    }

    @Override
    public void setNoMorePages()
    {
        PendingRead pendingRead;
        synchronized (this) {
            state.compareAndSet(NO_MORE_BUFFERS, FLUSHING);
            noMorePages.set(true);

            pendingRead = this.pendingRead;
            this.pendingRead = null;

            log.info("Task %s: %s pages and %s bytes was written into TempStorage", taskId, totalStoragePagesAdded.get(), totalStorageBytesAdded.get());
        }

        if (pendingRead != null) {
            processPendingRead(pendingRead);
        }

        checkFlushComplete();
    }

    private void checkFlushComplete()
    {
        if (state.get() != FLUSHING) {
            return;
        }

        if (totalBufferedPages.get() == 0) {
            destroy();
        }
    }

    @Override
    public void destroy()
    {
        PendingRead pendingRead;
        synchronized (this) {
            if (state.setIf(FINISHED, oldState -> !oldState.isTerminal())) {
                close();
            }
            pendingRead = this.pendingRead;
            this.pendingRead = null;
        }

        if (pendingRead != null) {
            pendingRead.completeResultFutureWithEmpty();
        }
    }

    @Override
    public void fail()
    {
        state.setIf(BufferState.FAILED, oldState -> !oldState.isTerminal());
    }

    private synchronized void close()
    {
        for (HandleInfo handleInfo : handleInfoQueue) {
            handleInfo.removeFile();
        }
        pages.clear();
        handleInfoQueue.clear();
        noMorePages.set(true);
        totalBufferedPages.set(0);
        totalBufferedBytes.set(0);
        totalPagesRemaining.set(0);
    }

    @Override
    public void setNoMorePagesForLifespan(Lifespan lifespan)
    {
        // NOOP
    }

    @Override
    public void registerLifespanCompletionCallback(Consumer<Lifespan> callback)
    {
        // NOOP
    }

    @Override
    public boolean isFinishedForLifespan(Lifespan lifespan)
    {
        return isFinished();
    }

    private long getPagesSize(Collection<SerializedPage> pages)
    {
        return pages.stream().mapToLong(SerializedPage::getRetainedSizeInBytes).sum();
    }

    private long getPagesRows(Collection<SerializedPage> pages)
    {
        return pages.stream().mapToLong(SerializedPage::getPositionCount).sum();
    }

    private class HandleInfo
    {
        private final Range<Long> range;
        private final ListenableFuture<TempStorageHandle> handleFuture;
        private final long bytes;
        private final int pageCount;

        public HandleInfo(Range<Long> range, ListenableFuture<TempStorageHandle> handleFuture, long bytes, int pageCount)
        {
            this.range = requireNonNull(range, "range is null");
            this.handleFuture = requireNonNull(handleFuture, "handleFuture is null");
            this.bytes = bytes;
            this.pageCount = pageCount;
        }

        public long getBytes()
        {
            return bytes;
        }

        public int getPageCount()
        {
            return pageCount;
        }

        public Range<Long> getRange()
        {
            return range;
        }

        public ListenableFuture<TempStorageHandle> getHandleFuture()
        {
            return handleFuture;
        }

        public void removeFile()
        {
            executor.execute(() -> {
                try {
                    tempStorage.remove(tempDataOperationContext, handleFuture.get());
                }
                catch (Exception e) {
                    log.error(e, "Failed to remove file from TempStorage");
                }
            });
        }
    }

    @Immutable
    private static class PendingRead
    {
        private final String taskInstanceId;
        private final long startSequenceId;
        private final long maxSizeInBytes;
        private final SettableFuture<BufferResult> resultFuture = SettableFuture.create();

        private PendingRead(String taskInstanceId, long startSequenceId, long maxSizeInBytes)
        {
            this.taskInstanceId = requireNonNull(taskInstanceId, "taskInstanceId is null");
            this.startSequenceId = startSequenceId;
            checkArgument(maxSizeInBytes > 0, "maxSizeInBytes must be at least 1 byte");
            this.maxSizeInBytes = maxSizeInBytes;
        }

        public long getStartSequenceId()
        {
            return startSequenceId;
        }

        public long getMaxSizeInBytes()
        {
            return maxSizeInBytes;
        }

        public ListenableFuture<BufferResult> getResultFuture()
        {
            return resultFuture;
        }

        public void completeResultFutureWithEmpty()
        {
            resultFuture.set(emptyResults(taskInstanceId, startSequenceId, false));
        }

        public void setResultFuture(ListenableFuture<BufferResult> result)
        {
            resultFuture.setFuture(result);
        }
    }

    private class GetTracker
    {
        private int startPage;
        private long bytes;
        private long pageCount;

        private final long maxSizeInBytes;
        private final List<SerializedPage> pages;
        private final List<HandleInfo> handleInfos;

        private GetTracker(long maxSizeInBytes, List<HandleInfo> handleInfos, List<SerializedPage> pages, int startPage)
        {
            checkArgument(maxSizeInBytes > 0, "maxSizeInBytes must be at least 1 byte");
            this.maxSizeInBytes = maxSizeInBytes;
            this.handleInfos = requireNonNull(handleInfos, "handleInfos is null");
            this.pages = requireNonNull(pages, "pages is null");
            this.startPage = startPage;
        }

        private void update(long newBytes, long newPageCount)
        {
            bytes = newBytes;
            pageCount = newPageCount;
            startPage = 0;
        }

        private long getMaxSizeInBytes()
        {
            return maxSizeInBytes;
        }

        private int getStartPage()
        {
            return startPage;
        }

        private long getBytes()
        {
            return bytes;
        }

        private long getPageCount()
        {
            return pageCount;
        }

        private List<SerializedPage> getMemoryPages()
        {
            return pages;
        }

        private List<HandleInfo> getHandleInfos()
        {
            return handleInfos;
        }
    }
}