HiveSplitSource.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.hive.InternalHiveSplit.InternalHiveBlock;
import com.facebook.presto.hive.util.AsyncQueue;
import com.facebook.presto.hive.util.AsyncQueue.BorrowResult;
import com.facebook.presto.hive.util.SizeBasedSplitWeightProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;

import javax.annotation.concurrent.GuardedBy;

import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;

import static com.facebook.airlift.concurrent.MoreFutures.failedFuture;
import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.hive.HiveSessionProperties.isSizeBasedSplitWeightsEnabled;
import static com.facebook.presto.hive.HiveSplitSource.StateKind.CLOSED;
import static com.facebook.presto.hive.HiveSplitSource.StateKind.FAILED;
import static com.facebook.presto.hive.HiveSplitSource.StateKind.INITIAL;
import static com.facebook.presto.hive.HiveSplitSource.StateKind.NO_MORE_SPLITS;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

class HiveSplitSource
        implements ConnectorSplitSource
{
    private static final Logger log = Logger.get(HiveSplit.class);

    private final String queryId;
    private final String databaseName;
    private final String tableName;
    private final CacheQuotaRequirement cacheQuotaRequirement;
    private final PerBucket queues;
    private final AtomicInteger bufferedInternalSplitCount = new AtomicInteger();
    private final long maxOutstandingSplitsBytes;

    private final DataSize maxSplitSize;
    private final DataSize maxInitialSplitSize;
    private final boolean useRewindableSplitSource;
    private final AtomicInteger remainingInitialSplits;

    private final HiveSplitLoader splitLoader;
    private final AtomicReference<State> stateReference = new AtomicReference<>(State.initial());

    private final AtomicLong estimatedSplitSizeInBytes = new AtomicLong();

    private final CounterStat highMemorySplitSourceCounter;
    private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean();
    private final HiveSplitWeightProvider splitWeightProvider;
    private final double splitScanRatio;
    private final long affinitySchedulingFileSectionSizeInBytes;

    private HiveSplitSource(
            ConnectorSession session,
            String databaseName,
            String tableName,
            CacheQuotaRequirement cacheQuotaRequirement,
            PerBucket queues,
            int maxInitialSplits,
            DataSize maxOutstandingSplitsSize,
            HiveSplitLoader splitLoader,
            CounterStat highMemorySplitSourceCounter,
            boolean useRewindableSplitSource,
            double splitScanRatio)
    {
        requireNonNull(session, "session is null");
        this.queryId = session.getQueryId();
        this.databaseName = requireNonNull(databaseName, "databaseName is null");
        this.cacheQuotaRequirement = requireNonNull(cacheQuotaRequirement, "cacheQuotaRequirement is null");
        this.tableName = requireNonNull(tableName, "tableName is null");
        this.queues = requireNonNull(queues, "queues is null");
        this.maxOutstandingSplitsBytes = requireNonNull(maxOutstandingSplitsSize, "maxOutstandingSplitsSize is null").toBytes();
        this.splitLoader = requireNonNull(splitLoader, "splitLoader is null");
        this.highMemorySplitSourceCounter = requireNonNull(highMemorySplitSourceCounter, "highMemorySplitSourceCounter is null");

        this.maxSplitSize = getMaxSplitSize(session);
        this.maxInitialSplitSize = getMaxInitialSplitSize(session);
        this.useRewindableSplitSource = useRewindableSplitSource;
        this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
        this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
        // Clamp value within [0.1, 1.0].
        // This ratio will be used to increase split sizes. The range implies
        // 1) We do not increase more than 10x(>= 0.1)
        // 2) We do not decrease split sizes(<= 1.0)
        // We schedule only upto 10x larger splits - being conservative not to schedule splits with too many rows.
        // For default size of 64MB, this will keep split sizes sent within 1GB. Usually files are smaller than this.
        if (!Double.isFinite(splitScanRatio)) {
            splitScanRatio = 1.0;
        }
        this.splitScanRatio = max(min(splitScanRatio, 1.0), 0.1);
        affinitySchedulingFileSectionSizeInBytes = getAffinitySchedulingFileSectionSize(session).toBytes();
    }

    public static HiveSplitSource allAtOnce(
            ConnectorSession session,
            String databaseName,
            String tableName,
            CacheQuotaRequirement cacheQuotaRequirement,
            int maxInitialSplits,
            int maxOutstandingSplits,
            DataSize maxOutstandingSplitsSize,
            HiveSplitLoader splitLoader,
            Executor executor,
            CounterStat highMemorySplitSourceCounter,
            double splitScanRatio)
    {
        return new HiveSplitSource(
                session,
                databaseName,
                tableName,
                cacheQuotaRequirement,
                new PerBucket()
                {
                    private final AsyncQueue<InternalHiveSplit> queue = new AsyncQueue<>(maxOutstandingSplits, executor);

                    @Override
                    public ListenableFuture<?> offer(OptionalInt bucketNumber, InternalHiveSplit connectorSplit)
                    {
                        // bucketNumber can be non-empty because BackgroundHiveSplitLoader does not have knowledge of execution plan
                        return queue.offer(connectorSplit);
                    }

                    @Override
                    public ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt bucketNumber, int maxSize, Function<List<InternalHiveSplit>, BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function)
                    {
                        checkArgument(!bucketNumber.isPresent());
                        return queue.borrowBatchAsync(maxSize, function);
                    }

                    @Override
                    public void noMoreSplits()
                    {
                        queue.finish();
                    }

                    @Override
                    public boolean isFinished(OptionalInt bucketNumber)
                    {
                        checkArgument(!bucketNumber.isPresent());
                        return queue.isFinished();
                    }

                    @Override
                    public int rewind(OptionalInt bucketNumber)
                    {
                        throw new UnsupportedOperationException("rewind is not supported for non bucketed split source");
                    }
                },
                maxInitialSplits,
                maxOutstandingSplitsSize,
                splitLoader,
                highMemorySplitSourceCounter,
                false,
                splitScanRatio);
    }

    public static HiveSplitSource bucketed(
            ConnectorSession session,
            String databaseName,
            String tableName,
            CacheQuotaRequirement cacheQuotaRequirement,
            int maxInitialSplits,
            int estimatedOutstandingSplitsPerBucket,
            DataSize maxOutstandingSplitsSize,
            HiveSplitLoader splitLoader,
            Executor executor,
            CounterStat highMemorySplitSourceCounter,
            double splitScanRatio)
    {
        return new HiveSplitSource(
                session,
                databaseName,
                tableName,
                cacheQuotaRequirement,
                new PerBucket()
                {
                    private final Map<Integer, AsyncQueue<InternalHiveSplit>> queues = new ConcurrentHashMap<>();
                    private final AtomicBoolean finished = new AtomicBoolean();

                    @Override
                    public ListenableFuture<?> offer(OptionalInt bucketNumber, InternalHiveSplit connectorSplit)
                    {
                        AsyncQueue<InternalHiveSplit> queue = queueFor(bucketNumber);
                        queue.offer(connectorSplit);
                        // Do not block "offer" when running split discovery in bucketed mode.
                        // A limit is enforced on estimatedSplitSizeInBytes.
                        return immediateFuture(null);
                    }

                    @Override
                    public ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt bucketNumber, int maxSize, Function<List<InternalHiveSplit>, BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function)
                    {
                        return queueFor(bucketNumber).borrowBatchAsync(maxSize, function);
                    }

                    @Override
                    public void noMoreSplits()
                    {
                        if (finished.compareAndSet(false, true)) {
                            queues.values().forEach(AsyncQueue::finish);
                        }
                    }

                    @Override
                    public boolean isFinished(OptionalInt bucketNumber)
                    {
                        return queueFor(bucketNumber).isFinished();
                    }

                    @Override
                    public int rewind(OptionalInt bucketNumber)
                    {
                        throw new UnsupportedOperationException("rewind is not supported for unrewindable split source");
                    }

                    private AsyncQueue<InternalHiveSplit> queueFor(OptionalInt bucketNumber)
                    {
                        checkArgument(bucketNumber.isPresent());
                        AtomicBoolean isNew = new AtomicBoolean();
                        AsyncQueue<InternalHiveSplit> queue = queues.computeIfAbsent(bucketNumber.getAsInt(), ignored -> {
                            isNew.set(true);
                            return new AsyncQueue<>(estimatedOutstandingSplitsPerBucket, executor);
                        });
                        if (isNew.get() && finished.get()) {
                            // Check `finished` and invoke `queue.finish` after the `queue` is added to the map.
                            // Otherwise, `queue.finish` may not be invoked if `finished` is set while the lambda above is being evaluated.
                            queue.finish();
                        }
                        return queue;
                    }
                },
                maxInitialSplits,
                maxOutstandingSplitsSize,
                splitLoader,
                highMemorySplitSourceCounter,
                false,
                splitScanRatio);
    }

    public static HiveSplitSource bucketedRewindable(
            ConnectorSession session,
            String databaseName,
            String tableName,
            CacheQuotaRequirement cacheQuotaRequirement,
            int maxInitialSplits,
            DataSize maxOutstandingSplitsSize,
            HiveSplitLoader splitLoader,
            Executor executor,
            CounterStat highMemorySplitSourceCounter,
            double splitScanRatio)
    {
        return new HiveSplitSource(
                session,
                databaseName,
                tableName,
                cacheQuotaRequirement,
                new PerBucket()
                {
                    @GuardedBy("this")
                    private final Map<Integer, List<InternalHiveSplit>> splits = new HashMap<>();
                    private final SettableFuture<?> allSplitLoaded = SettableFuture.create();

                    @Override
                    public synchronized ListenableFuture<?> offer(OptionalInt bucketNumber, InternalHiveSplit connectorSplit)
                    {
                        checkArgument(bucketNumber.isPresent(), "bucketNumber must be present");
                        splits.computeIfAbsent(bucketNumber.getAsInt(), ignored -> new ArrayList<>()).add(connectorSplit);
                        // Do not block "offer" when running split discovery in bucketed mode.
                        return immediateFuture(null);
                    }

                    @Override
                    public synchronized ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt bucketNumber, int maxSize, Function<List<InternalHiveSplit>, BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function)
                    {
                        checkArgument(bucketNumber.isPresent(), "bucketNumber must be present");
                        if (!allSplitLoaded.isDone()) {
                            return FluentFuture.from(allSplitLoaded).transform(ignored -> ImmutableList.of(), executor);
                        }
                        return immediateFuture(function.apply(getSplits(bucketNumber.getAsInt(), maxSize)).getResult());
                    }

                    private List<InternalHiveSplit> getSplits(int bucketNumber, int batchSize)
                    {
                        return splits.getOrDefault(bucketNumber, ImmutableList.of()).stream()
                                .filter(split -> !split.isDone())
                                .limit(batchSize)
                                .collect(toImmutableList());
                    }

                    @Override
                    public void noMoreSplits()
                    {
                        allSplitLoaded.set(null);
                    }

                    @Override
                    public boolean isFinished(OptionalInt bucketNumber)
                    {
                        checkArgument(bucketNumber.isPresent(), "bucketNumber must be present");
                        // For virtual buckets, not all the buckets would exist, so we need to handle non-existent bucket.
                        return allSplitLoaded.isDone() && (!splits.containsKey(bucketNumber.getAsInt()) || splits.get(bucketNumber.getAsInt()).stream().allMatch(InternalHiveSplit::isDone));
                    }

                    @Override
                    public synchronized int rewind(OptionalInt bucketNumber)
                    {
                        checkArgument(bucketNumber.isPresent(), "bucketNumber must be present");
                        checkState(allSplitLoaded.isDone(), "splits cannot be rewound before splits enumeration is finished");
                        int revivedSplitCount = 0;
                        for (InternalHiveSplit split : splits.getOrDefault(bucketNumber.getAsInt(), ImmutableList.of())) {
                            if (split.isDone()) {
                                revivedSplitCount++;
                            }
                            split.reset();
                        }
                        return revivedSplitCount;
                    }

                    @Override
                    public int decrementAndGetPartitionReferences(InternalHiveSplit split)
                    {
                        // we keep all splits forever so references should never decrease
                        throw new UnsupportedOperationException("decrementPartitionReferences is not supported for rewindable split sources");
                    }
                },
                maxInitialSplits,
                maxOutstandingSplitsSize,
                splitLoader,
                highMemorySplitSourceCounter,
                true,
                splitScanRatio);
    }

    /**
     * The upper bound of outstanding split count.
     * It might be larger than the actual number when called concurrently with other methods.
     */
    @VisibleForTesting
    int getBufferedInternalSplitCount()
    {
        return bufferedInternalSplitCount.get();
    }

    ListenableFuture<?> addToQueue(List<? extends InternalHiveSplit> splits)
    {
        ListenableFuture<?> lastResult = immediateFuture(null);
        for (InternalHiveSplit split : splits) {
            lastResult = addToQueue(split);
        }
        return lastResult;
    }

    ListenableFuture<?> addToQueue(InternalHiveSplit split)
    {
        if (stateReference.get().getKind() != INITIAL) {
            return immediateFuture(null);
        }

        // The PartitionInfo isn't included in the size of the InternalHiveSplit
        // because it's a shared object. If this is the first InternalHiveSplit
        // for that PartitionInfo, add its cost
        if (split.getPartitionInfo().incrementAndGetReferences() == 1) {
            estimatedSplitSizeInBytes.addAndGet(split.getPartitionInfo().getEstimatedSizeInBytes());
        }

        if (estimatedSplitSizeInBytes.addAndGet(split.getEstimatedSizeInBytes()) > maxOutstandingSplitsBytes) {
            // TODO: investigate alternative split discovery strategies when this error is hit.
            // This limit should never be hit given there is a limit of maxOutstandingSplits.
            // If it's hit, it means individual splits are huge.
            if (loggedHighMemoryWarning.compareAndSet(false, true)) {
                highMemorySplitSourceCounter.update(1);
                log.warn("Split buffering for %s.%s in query %s exceeded memory limit (%s). %s splits are buffered.",
                        databaseName, tableName, queryId, succinctBytes(maxOutstandingSplitsBytes), getBufferedInternalSplitCount());
            }
            throw new PrestoException(HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT, format(
                    "Split buffering for %s.%s exceeded memory limit (%s). %s splits are buffered.",
                    databaseName, tableName, succinctBytes(maxOutstandingSplitsBytes), getBufferedInternalSplitCount()));
        }
        bufferedInternalSplitCount.incrementAndGet();
        OptionalInt bucketNumber = split.getReadBucketNumber();
        return queues.offer(bucketNumber, split);
    }

    void noMoreSplits()
    {
        if (setIf(stateReference, State.noMoreSplits(), state -> state.getKind() == INITIAL)) {
            // Stop the split loader before finishing the queue.
            // Once the queue is finished, it will always return a completed future to avoid blocking any caller.
            // This could lead to a short period of busy loop in splitLoader (although unlikely in general setup).
            splitLoader.stop();
            queues.noMoreSplits();
        }
    }

    void fail(Throwable e)
    {
        // The error must be recorded before setting the noMoreSplits marker to make sure
        // isFinished will observe failure instead of successful completion.
        // Only record the first error message.
        if (setIf(stateReference, State.failed(e), state -> state.getKind() == INITIAL)) {
            // Stop the split loader before finishing the queue.
            // Once the queue is finished, it will always return a completed future to avoid blocking any caller.
            // This could lead to a short period of busy loop in splitLoader (although unlikely in general setup).
            splitLoader.stop();
            queues.noMoreSplits();
        }
    }

    @Override
    public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
    {
        boolean noMoreSplits;
        State state = stateReference.get();
        switch (state.getKind()) {
            case INITIAL:
                noMoreSplits = false;
                break;
            case NO_MORE_SPLITS:
                noMoreSplits = true;
                break;
            case FAILED:
                return failedFuture(state.getThrowable());
            case CLOSED:
                throw new IllegalStateException("HiveSplitSource is already closed");
            default:
                throw new UnsupportedOperationException();
        }

        OptionalInt bucketNumber = toBucketNumber(partitionHandle);
        ListenableFuture<List<ConnectorSplit>> future = queues.borrowBatchAsync(bucketNumber, maxSize, internalSplits -> {
            ImmutableList.Builder<InternalHiveSplit> splitsToInsertBuilder = ImmutableList.builder();
            ImmutableList.Builder<ConnectorSplit> resultBuilder = ImmutableList.builder();
            int removedEstimatedSizeInBytes = 0;
            for (InternalHiveSplit internalSplit : internalSplits) {
                long maxSplitBytes = maxSplitSize.toBytes();
                if (remainingInitialSplits.get() > 0) {
                    if (remainingInitialSplits.getAndDecrement() > 0) {
                        maxSplitBytes = maxInitialSplitSize.toBytes();
                    }
                }
                // Increase split size if scanned bytes per split are expected to be less.
                maxSplitBytes = (long) (maxSplitBytes / splitScanRatio);
                InternalHiveBlock block = internalSplit.currentBlock();
                long splitBytes;
                if (internalSplit.isSplittable()) {
                    long remainingBlockBytes = block.getEnd() - internalSplit.getStart();
                    if (remainingBlockBytes <= maxSplitBytes) {
                        splitBytes = remainingBlockBytes;
                    }
                    else if (maxSplitBytes * 2 >= remainingBlockBytes) {
                        //  Second to last split in this block, generate two evenly sized splits
                        splitBytes = remainingBlockBytes / 2;
                    }
                    else {
                        splitBytes = maxSplitBytes;
                    }
                }
                else {
                    splitBytes = internalSplit.getEnd() - internalSplit.getStart();
                }

                HiveFileSplit fileSplit = new HiveFileSplit(
                        internalSplit.getPath(),
                        internalSplit.getStart(),
                        splitBytes,
                        internalSplit.getFileSize(),
                        internalSplit.getFileModifiedTime(),
                        internalSplit.getExtraFileInfo(),
                        internalSplit.getCustomSplitInfo(),
                        internalSplit.getStart() / affinitySchedulingFileSectionSizeInBytes);

                resultBuilder.add(new HiveSplit(
                        fileSplit,
                        databaseName,
                        tableName,
                        internalSplit.getPartitionName(),
                        internalSplit.getPartitionInfo().getStorage(),
                        internalSplit.getPartitionKeys(),
                        block.getAddresses(),
                        internalSplit.getReadBucketNumber(),
                        internalSplit.getTableBucketNumber(),
                        internalSplit.getNodeSelectionStrategy(),
                        internalSplit.getPartitionInfo().getPartitionDataColumnCount(),
                        internalSplit.getTableToPartitionMapping(),
                        internalSplit.getBucketConversion(),
                        internalSplit.isS3SelectPushdownEnabled(),
                        cacheQuotaRequirement,
                        internalSplit.getEncryptionInformation(),
                        internalSplit.getPartitionInfo().getRedundantColumnDomains(),
                        splitWeightProvider.weightForSplitSizeInBytes((long) (splitBytes * splitScanRatio)),
                        internalSplit.getPartitionInfo().getRowIdPartitionComponent()));

                internalSplit.increaseStart(splitBytes);

                if (internalSplit.isDone()) {
                    removedEstimatedSizeInBytes += internalSplit.getEstimatedSizeInBytes();

                    // rewindable split sources keep their splits forever
                    if (!useRewindableSplitSource && queues.decrementAndGetPartitionReferences(internalSplit) == 0) {
                        removedEstimatedSizeInBytes += internalSplit.getPartitionInfo().getEstimatedSizeInBytes();
                    }
                }
                else {
                    splitsToInsertBuilder.add(internalSplit);
                }
            }

            // For rewindable split source, we keep all the splits in memory.
            if (!useRewindableSplitSource) {
                estimatedSplitSizeInBytes.addAndGet(-removedEstimatedSizeInBytes);
            }

            List<InternalHiveSplit> splitsToInsert = splitsToInsertBuilder.build();
            List<ConnectorSplit> result = resultBuilder.build();
            bufferedInternalSplitCount.addAndGet(splitsToInsert.size() - result.size());

            return new AsyncQueue.BorrowResult<>(splitsToInsert, result);
        });

        ListenableFuture<ConnectorSplitBatch> transform = Futures.transform(future, splits -> {
            requireNonNull(splits, "splits is null");
            if (noMoreSplits) {
                // Checking splits.isEmpty() here is required for thread safety.
                // Let's say there are 10 splits left, and max number of splits per batch is 5.
                // The futures constructed in two getNextBatch calls could each fetch 5, resulting in zero splits left.
                // After fetching the splits, both futures reach this line at the same time.
                // Without the isEmpty check, both will claim they are the last.
                // Side note 1: In such a case, it doesn't actually matter which one gets to claim it's the last.
                //              But having both claim they are the last would be a surprising behavior.
                // Side note 2: One could argue that the isEmpty check is overly conservative.
                //              The caller of getNextBatch will likely need to make an extra invocation.
                //              But an extra invocation likely doesn't matter.
                return new ConnectorSplitBatch(splits, splits.isEmpty() && queues.isFinished(bucketNumber));
            }
            else {
                return new ConnectorSplitBatch(splits, false);
            }
        }, directExecutor());

        return toCompletableFuture(transform);
    }

    @Override
    public void rewind(ConnectorPartitionHandle partitionHandle)
    {
        bufferedInternalSplitCount.addAndGet(queues.rewind(toBucketNumber(partitionHandle)));
    }

    @Override
    public boolean isFinished()
    {
        State state = stateReference.get();

        switch (state.getKind()) {
            case INITIAL:
                return false;
            case NO_MORE_SPLITS:
                return bufferedInternalSplitCount.get() == 0;
            case FAILED:
                throw propagatePrestoException(state.getThrowable());
            case CLOSED:
                throw new IllegalStateException("HiveSplitSource is already closed");
            default:
                throw new UnsupportedOperationException();
        }
    }

    @Override
    public void close()
    {
        if (setIf(stateReference, State.closed(), state -> state.getKind() == INITIAL || state.getKind() == NO_MORE_SPLITS)) {
            // Stop the split loader before finishing the queue.
            // Once the queue is finished, it will always return a completed future to avoid blocking any caller.
            // This could lead to a short period of busy loop in splitLoader (although unlikely in general setup).
            splitLoader.stop();
            queues.noMoreSplits();
        }
    }

    private static OptionalInt toBucketNumber(ConnectorPartitionHandle partitionHandle)
    {
        if (partitionHandle == NOT_PARTITIONED) {
            return OptionalInt.empty();
        }
        return OptionalInt.of(((HivePartitionHandle) partitionHandle).getBucket());
    }

    private static <T> boolean setIf(AtomicReference<T> atomicReference, T newValue, Predicate<T> predicate)
    {
        while (true) {
            T current = atomicReference.get();
            if (!predicate.test(current)) {
                return false;
            }
            if (atomicReference.compareAndSet(current, newValue)) {
                return true;
            }
        }
    }

    private static RuntimeException propagatePrestoException(Throwable throwable)
    {
        if (throwable instanceof PrestoException) {
            throw (PrestoException) throwable;
        }
        if (throwable instanceof FileNotFoundException) {
            throw new PrestoException(HIVE_FILE_NOT_FOUND, throwable);
        }
        throw new PrestoException(HIVE_UNKNOWN_ERROR, throwable);
    }

    interface PerBucket
    {
        ListenableFuture<?> offer(OptionalInt bucketNumber, InternalHiveSplit split);

        ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt bucketNumber, int maxSize, Function<List<InternalHiveSplit>, BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function);

        void noMoreSplits();

        boolean isFinished(OptionalInt bucketNumber);

        // returns the number of finished InternalHiveSplits that are rewound
        int rewind(OptionalInt bucketNumber);

        default int decrementAndGetPartitionReferences(InternalHiveSplit split)
        {
            return split.getPartitionInfo().decrementAndGetReferences();
        }
    }

    static class State
    {
        private final StateKind kind;
        private final Throwable throwable;

        private State(StateKind kind, Throwable throwable)
        {
            this.kind = kind;
            this.throwable = throwable;
        }

        public StateKind getKind()
        {
            return kind;
        }

        public Throwable getThrowable()
        {
            checkState(throwable != null);
            return throwable;
        }

        public static State initial()
        {
            return new State(INITIAL, null);
        }

        public static State noMoreSplits()
        {
            return new State(NO_MORE_SPLITS, null);
        }

        public static State failed(Throwable throwable)
        {
            return new State(FAILED, throwable);
        }

        public static State closed()
        {
            return new State(CLOSED, null);
        }
    }

    enum StateKind
    {
        INITIAL,
        NO_MORE_SPLITS,
        FAILED,
        CLOSED,
    }
}