DeltaSplitManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.delta;

import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import io.delta.kernel.data.Row;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;

import javax.inject.Inject;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toMap;

public class DeltaSplitManager
        implements ConnectorSplitManager
{
    private final String connectorId;
    private final DeltaConfig deltaConfig;
    private final DeltaClient deltaClient;
    private final TypeManager typeManager;

    @Inject
    public DeltaSplitManager(DeltaConnectorId connectorId, DeltaConfig deltaConfig, DeltaClient deltaClient, TypeManager typeManager)
    {
        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
        this.deltaConfig = requireNonNull(deltaConfig, "deltaConfig is null");
        this.deltaClient = requireNonNull(deltaClient, "deltaClient is null");
        this.typeManager = requireNonNull(typeManager, "typeManager is null");
    }

    @Override
    public ConnectorSplitSource getSplits(
            ConnectorTransactionHandle handle,
            ConnectorSession session,
            ConnectorTableLayoutHandle layout,
            SplitSchedulingContext splitSchedulingContext)
    {
        return new DeltaSplitSource(session, ((DeltaTableLayoutHandle) layout));
    }

    private class DeltaSplitSource
            implements ConnectorSplitSource
    {
        private final DeltaTable deltaTable;
        private final CloseableIterator<Row> rowIterator;
        private final int maxBatchSize;
        private final ConnectorSession session;

        DeltaSplitSource(ConnectorSession session, DeltaTableLayoutHandle deltaTableHandle)
        {
            this.session = requireNonNull(session, "session is null");
            this.deltaTable = deltaTableHandle.getTable().getDeltaTable();
            this.rowIterator = DeltaExpressionUtils.iterateWithPartitionPruning(
                    deltaClient.listFiles(session, deltaTable),
                    deltaTableHandle.getPredicate(),
                    typeManager);
            this.maxBatchSize = deltaConfig.getMaxSplitsBatchSize();
        }

        @Override
        public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
        {
            ImmutableList.Builder<ConnectorSplit> splitBuilder = ImmutableList.builder();
            long currentSplitCount = 0;
            while (rowIterator.hasNext() && currentSplitCount < maxSize && currentSplitCount < maxBatchSize) {
                Row row = rowIterator.next();
                FileStatus addFileStatus = InternalScanFileUtils.getAddFileStatus(row);
                splitBuilder.add(new DeltaSplit(
                        connectorId,
                        deltaTable.getSchemaName(),
                        deltaTable.getTableName(),
                        addFileStatus.getPath(),
                        0, /* start */
                        addFileStatus.getSize() /* split length - default is read the entire file in one split */,
                        addFileStatus.getSize(),
                        removeNullPartitionValues(InternalScanFileUtils.getPartitionValues(row)),
                        getNodeSelectionStrategy(session)));
                currentSplitCount++;
            }

            return completedFuture(new ConnectorSplitBatch(splitBuilder.build(), !rowIterator.hasNext()));
        }

        @Override
        public void close()
        {
            try {
                rowIterator.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public boolean isFinished()
        {
            return !rowIterator.hasNext();
        }
    }

    /**
     * Utility method to remove the null value partition values.
     * These null values cause problems later when used with Guava Immutable map structures.
     */
    private static Map<String, String> removeNullPartitionValues(Map<String, String> partitionValues)
    {
        return partitionValues.entrySet().stream()
                .filter(entry -> entry.getValue() != null)
                .collect(toMap(Entry::getKey, Entry::getValue));
    }
}